【dbt小ネタ】 ログの集計 : incremental モデルの実運用 (upsert, リカバリ手法や自動復旧の実現)

カヤックSREの池田です。
最近、日本のデータエンジニアリング界隈でのdbt(Data build tool)の活用がじわじわと盛り上がってきています。
dbtはpythonのJinjaテンプレートを利用したSQLの拡張を実現し、ETL処理のT(データ変換)に関して強力な機能を提供してくれます。
dbt自体の詳しい説明などは、インターネット上に増えてきていますのでそちらにおまかせするとして、本記事ではdbtを使い慣れてきた人向けの小ネタを話します。

今回は『ログの集計』を例にincrementalモデルを運用する上での問題とその解決方法を紹介します。

www.getdbt.com

まずはじめに

dbt では モデルと呼ばれる*.sql とスキーマと呼ばれる*.ymlを記述することになります。

例えば、以下のようなsourceのスキーマがあるとします。
models/staging/log/_src_log.yml:

sources:
  - name: log
    schema: source__log
    tables:
      - name: nginx_access_logs
        description: nginxが出力するログを取り込んだもの
        columns:
          - name: time
            description: アクセス時刻 [timestamp without time zone]
          - name: method
            description: リクエストメソッド [varchar(8)]
          - name: uri
            description: リクエストURL [varchar(max)]
          - name: user_id 
            description: ログインユーザーのID [varchar(256)]

このsourceを利用したシンプルなDAUを計算するモデルとして、以下のようなSQLを記述することがdbtではできます。
models/staging/log/base/base__log_daily_active_users.sql:

{{ config(
    materialized="table",
    schema="kpi"
) }}

select 
    {{ dbt.safe_cast(dbt.date_trunc('day', '"time"'), api.Column.translate_type("date")) }} as ymd
    ,count(distinct user_id) active_users
from {{ source('log', 'nginx_access_logs') }}
group by 1

想定する状況として、このモデルを1日1回実行しているとします。(スケジュール実行の方法は環境によって色々方法が変わるので割愛します)
このモデルは毎日洗替されるのですが、ある時に日々のDAUを差分更新*1したいという要求がでてきたとします。
おそらく、以下のように改変することになるでしょう。

--- before 
+++ after
@@ -1,5 +1,5 @@
 {{ config(
-    materialized="table",
+    materialized="incremental",
     schema="kpi"
 ) }}
 
@@ -7,4 +7,7 @@
     {{ dbt.safe_cast(dbt.date_trunc('day', '"time"'), api.Column.translate_type("date")) }} as ymd
     ,count(distinct user_id) daily_active_users
 from {{ source('log', 'nginx_access_logs') }}
+{%- if is_incremental() %}
+where {{ dbt.date_trunc('day', '"time"') }} = {{ dbt.date_trunc('day', dbt.current_timestamp()) }}
+{%- endif %}
 group by 1

こうすることで、初回実行時はログのすべてを使って算出しテーブルを新規作成。次回実行時は本日分のみ使って算出し追記することになります。

ここまでは、dbtの基本的な内容になるとは思います。実はこのincrementalモデルを実際に運用する上では色々と問題が起きます。 本記事で、運用上の問題とその解決方法について紹介していこうと思います。

実運用をしてみよう。

スタート地点になるdbtのモデルをもう一度提示します。

{{ config(
    materialized="incremental",
    schema="kpi"
) }}

select
    {{ dbt.safe_cast(dbt.date_trunc('day', '"time"'), api.Column.translate_type("date")) }} as ymd
    ,count(distinct user_id) daily_active_users
from {{ source('log', 'nginx_access_logs') }}
{%- if is_incremental() %}
where {{ dbt.date_trunc('day', '"time"') }} = {{ dbt.date_trunc('day', dbt.current_timestamp()) }}
{%- endif %}
group by 1

さて、このモデルに対してスキーマを追記、テストの追加をします。

version: 2

models:
  - name: base__log_daily_active_users
    description: 日々のDAUを記録したテーブル
    columns:
      - name: ymd
        description: 日付
        tests:
          - unique
          - not_null
      - name: daily_active_users
        description: DAUの数値
        tests:
          - not_null 

この状態で実際に運用すると、どのような問題が起きるのでしょうか?

1日に複数回実行された場合 (冪等性の確保)

このモデル、実は 1日に1回だけ実行される ことが暗黙の前提となっています。
では、1日に複数回実行するとどうなるでしょう。 答えは、 ymd カラムの unique テストが失敗します。
つまり、同じ日が重複して記録されてしまいます。 なぜなら、incrementalモデルのデフォルトは追記するという挙動だからです。

この問題をどうやって解決するのでしょうか? これは一般に upsert という処理の実現をする話になります。

upsert とは、すでに行があればUpdate、なければInsertをする処理のことを言います。
Databaseによっては、このupsertは機能として提供されています。(例えば BigQueryの場合はMERGE文を使うことで実現できます。)
このupsert は、Databaseごとに実現方法が異なるため、今までであればDatabaseに対する詳しい知識が必要な応用的な処理でした。 しかし、dbtではDatabaseによらず同一の記述で実現できます。 なぜなら、dbtがDatabaseごとの差異を吸収して対象のDatabase向けの処理に書き換えてくれるからです。
これがdbtを使う魅力の一つになります。

dbtでupsertを実現する方法は、configにunique_keyを指定するだけです。

--- before
+++ after
@@ -1,5 +1,6 @@
 {{ config(
     materialized="incremental",
+    unique_key=['ymd'],
     schema="kpi"
 ) }}

こうすることで、上記のincrementalモデルはupsertによる更新になります。 実行時点ですでに対象の日付の行があれば、daily_active_users列を更新するという挙動になります。
1日に何回実行しても、ログが追記されてなければ最終的な結果は同じになるということになります。
こういう風な性質のことを冪等性といいます。incrementalモデルを使う場合は冪等性の確保を念頭に置くと運用しやすくなります。

日にちを開けて実行された場合

冪等性を確保したことで、一日に何回実行しても問題はなくなりました。 ところが、1日に1回実行される という暗黙の前提は、別の切り口でも問題をおこします。 それは、ある1日が何かしらの問題で、実行されなかった場合です。この場合、その実行されなかった日のデータが欠損してしまいます。 よくあるケースとして、ログの到着遅延、dbt実行のruntimeの問題、ログの出力先の変更、クラウドプロバイダー側の障害、etc...です。 実際に運用すると、毎日確実に実行されるという前提条件の担保が難しく感じることがあるでしょう。 dbtではモデルを工夫することでデータの欠損への対応も容易にすることができます。

データの欠損への一つの対応方法として、varを使った指定日の再処理という方法があります。 モデルを以下のように変更します。

--- before
+++ after
@@ -9,6 +9,12 @@
     ,count(distinct user_id) daily_active_users
 from {{ source('log', 'nginx_access_logs') }}
 {%- if is_incremental() %}
-where {{ dbt.date_trunc('day', '"time"') }} = {{ dbt.date_trunc('day', dbt.current_timestamp()) }}
+where
+    {%- set target_date = var('target_date','') %}
+    {%- if target_date != '' %}
+    {{ dbt.date_trunc('day', '"time"') }} = {{ dbt.safe_cast("'"~target_date~"'", api.Column.translate_type("date")) }}
+    {%- else %}
+    {{ dbt.date_trunc('day', '"time"') }} = {{ dbt.date_trunc('day', dbt.current_timestamp()) }}
+    {%- endif %}
 {%- endif %}
 group by 1

このモデルを使って、次のようなコマンドを実行することである特定の日だけ再処理を実行することが可能になります。

$ dbt build --select base__log_daily_active_users --var 'target_date: 2022-10-11'

この指定日の再処理の仕組みは、様々な状況のためのリカバリ手法として使うことができます。

また、何かしらが原因で失敗し続けて、あるときに復旧したという状況を想定したときに、 自動で最後の日から再計算してくれる機構があれば、運用上すごく便利になります。なぜなら、手動で再処理をする必要がないからです。
例えば次のように変更します。

--- before
+++ after
@@ -14,7 +14,15 @@
     {%- if target_date != '' %}
     {{ dbt.date_trunc('day', '"time"') }} = {{ dbt.safe_cast("'"~target_date~"'", api.Column.translate_type("date")) }}
     {%- else %}
-    {{ dbt.date_trunc('day', '"time"') }} = {{ dbt.date_trunc('day', dbt.current_timestamp()) }}
+        {%- set get_last_ymd_sql %}
+            select max(ymd) from {{ this }}
+        {%- endset %}
+        {%- set last_ymd = run_query(get_last_ymd_sql).columns[0].values() | first %}
+        {%- if last_ymd is not none %}
+            {{ dbt.date_trunc('day', '"time"') }} >= {{ dbt.safe_cast("'"~last_ymd~"'", api.Column.translate_type("date")) }}
+        {%- else %}
+            {{ dbt.date_trunc('day', '"time"') }} = {{ dbt.date_trunc('day', dbt.current_timestamp()) }}
+        {%- endif %}
     {%- endif %}
 {%- endif %}
 group by 1

上記の変更によって、2回目以降の実行で、target_date が与えられていない場合は既存のテーブルに対して

select max(ymd) from {{ this }}

を実行して、その結果を用いてSQLを描画・実行します。

モデルが複雑になって、分かりづらいとは思います。ですので、実際の描画されたSQLを見てみましょう。

select
    cast(date_trunc('day', "time") as date) as ymd
    ,count(distinct user_id) daily_active_users
from "prod"."source__log"."web_access_logs"
where date_trunc('day', "time") >= cast('2022-10-26' as date)
group by 1

このSQLは2022-10-26 以降の更新が停止していたと仮定した場合の描画結果です。 このようにすることで、ある程度の自動復旧が可能になります。

おわりに

本記事は、dbtのincrementalの利用方法の中で、基本から1歩踏み込んだ実運用的な話をしました。 実際に完成したincrementalモデルは以下のようになります。

{{ config(
    materialized="incremental",
    unique_key=['ymd'],
    schema="kpi"
) }}

select
    {{ dbt.safe_cast(dbt.date_trunc('day', '"time"'), api.Column.translate_type("date")) }} as ymd
    ,count(distinct user_id) daily_active_users
from {{ source('log', 'nginx_access_logs') }}
{%- if is_incremental() %}
where
    {%- set target_date = var('target_date','') %}
    {%- if target_date != '' %}
    {{ dbt.date_trunc('day', '"time"') }} = {{ dbt.safe_cast("'"~target_date~"'", api.Column.translate_type("date")) }}
    {%- else %}
        {%- set get_last_ymd_sql %}
            select max(ymd) from {{ this }}
        {%- endset %}
        {%- set last_ymd = run_query(get_last_ymd_sql).columns[0].values() | first %}
        {%- if last_ymd is not none %}
            {{ dbt.date_trunc('day', '"time"') }} >= {{ dbt.safe_cast("'"~last_ymd~"'", api.Column.translate_type("date")) }}
        {%- else %}
            {{ dbt.date_trunc('day', '"time"') }} = {{ dbt.date_trunc('day', dbt.current_timestamp()) }}
        {%- endif %}
    {%- endif %}
{%- endif %}
group by 1

上記のようなモデルを作成することで、冪等性の確保や障害時のリカバリが容易になります。 このようにdbtを活用することで複雑なログ集計の処理を、一つのSQLファイルとして表現し、 実際の蓄積しているデータの状況に合わせて複数のパターンのSQLを描画・実行できるようになりました。 ログの処理は、差分更新をしたくなるケースが良くあります。そのため、蓄積しているデータの状況に合わせて、複数パターンのSQLを描画・実行できるdbtは非常に強力な味方となります。 Jinjaテンプレート記法を多数使うことになるので、モデルの開発には慣れが必要ではありますが、ぜひ皆様も使ってみてください。

カヤックでは、継続的なデータの運用に興味があるエンジニアも募集しています。

hubspot.kayac.com

*1:ログのテーブルが肥大化して洗替では実行時間が長くなる等の理由で、前回からの変更分のみ計算して更新するということ