【dbt小ネタ】 ログの集計 : incremental モデルの実運用 (upsert, リカバリ手法や自動復旧の実現)

カヤックSREの池田です。
最近、日本のデータエンジニアリング界隈でのdbt(Data build tool)の活用がじわじわと盛り上がってきています。
dbtはpythonのJinjaテンプレートを利用したSQLの拡張を実現し、ETL処理のT(データ変換)に関して強力な機能を提供してくれます。
dbt自体の詳しい説明などは、インターネット上に増えてきていますのでそちらにおまかせするとして、本記事ではdbtを使い慣れてきた人向けの小ネタを話します。

今回は『ログの集計』を例にincrementalモデルを運用する上での問題とその解決方法を紹介します。

www.getdbt.com

まずはじめに

dbt では モデルと呼ばれる*.sql とスキーマと呼ばれる*.ymlを記述することになります。

例えば、以下のようなsourceのスキーマがあるとします。
models/staging/log/_src_log.yml:

sources:
  - name: log
    schema: source__log
    tables:
      - name: nginx_access_logs
        description: nginxが出力するログを取り込んだもの
        columns:
          - name: time
            description: アクセス時刻 [timestamp without time zone]
          - name: method
            description: リクエストメソッド [varchar(8)]
          - name: uri
            description: リクエストURL [varchar(max)]
          - name: user_id 
            description: ログインユーザーのID [varchar(256)]

このsourceを利用したシンプルなDAUを計算するモデルとして、以下のようなSQLを記述することがdbtではできます。
models/staging/log/base/base__log_daily_active_users.sql:

{{ config(
    materialized="table",
    schema="kpi"
) }}

select 
    {{ dbt.safe_cast(dbt.date_trunc('day', '"time"'), api.Column.translate_type("date")) }} as ymd
    ,count(distinct user_id) active_users
from {{ source('log', 'nginx_access_logs') }}
group by 1

想定する状況として、このモデルを1日1回実行しているとします。(スケジュール実行の方法は環境によって色々方法が変わるので割愛します)
このモデルは毎日洗替されるのですが、ある時に日々のDAUを差分更新*1したいという要求がでてきたとします。
おそらく、以下のように改変することになるでしょう。

--- before 
+++ after
@@ -1,5 +1,5 @@
 {{ config(
-    materialized="table",
+    materialized="incremental",
     schema="kpi"
 ) }}
 
@@ -7,4 +7,7 @@
     {{ dbt.safe_cast(dbt.date_trunc('day', '"time"'), api.Column.translate_type("date")) }} as ymd
     ,count(distinct user_id) daily_active_users
 from {{ source('log', 'nginx_access_logs') }}
+{%- if is_incremental() %}
+where {{ dbt.date_trunc('day', '"time"') }} = {{ dbt.date_trunc('day', dbt.current_timestamp()) }}
+{%- endif %}
 group by 1

こうすることで、初回実行時はログのすべてを使って算出しテーブルを新規作成。次回実行時は本日分のみ使って算出し追記することになります。

ここまでは、dbtの基本的な内容になるとは思います。実はこのincrementalモデルを実際に運用する上では色々と問題が起きます。 本記事で、運用上の問題とその解決方法について紹介していこうと思います。

実運用をしてみよう。

スタート地点になるdbtのモデルをもう一度提示します。

{{ config(
    materialized="incremental",
    schema="kpi"
) }}

select
    {{ dbt.safe_cast(dbt.date_trunc('day', '"time"'), api.Column.translate_type("date")) }} as ymd
    ,count(distinct user_id) daily_active_users
from {{ source('log', 'nginx_access_logs') }}
{%- if is_incremental() %}
where {{ dbt.date_trunc('day', '"time"') }} = {{ dbt.date_trunc('day', dbt.current_timestamp()) }}
{%- endif %}
group by 1

さて、このモデルに対してスキーマを追記、テストの追加をします。

version: 2

models:
  - name: base__log_daily_active_users
    description: 日々のDAUを記録したテーブル
    columns:
      - name: ymd
        description: 日付
        tests:
          - unique
          - not_null
      - name: daily_active_users
        description: DAUの数値
        tests:
          - not_null 

この状態で実際に運用すると、どのような問題が起きるのでしょうか?

1日に複数回実行された場合 (冪等性の確保)

このモデル、実は 1日に1回だけ実行される ことが暗黙の前提となっています。
では、1日に複数回実行するとどうなるでしょう。 答えは、 ymd カラムの unique テストが失敗します。
つまり、同じ日が重複して記録されてしまいます。 なぜなら、incrementalモデルのデフォルトは追記するという挙動だからです。

この問題をどうやって解決するのでしょうか? これは一般に upsert という処理の実現をする話になります。

upsert とは、すでに行があればUpdate、なければInsertをする処理のことを言います。
Databaseによっては、このupsertは機能として提供されています。(例えば BigQueryの場合はMERGE文を使うことで実現できます。)
このupsert は、Databaseごとに実現方法が異なるため、今までであればDatabaseに対する詳しい知識が必要な応用的な処理でした。 しかし、dbtではDatabaseによらず同一の記述で実現できます。 なぜなら、dbtがDatabaseごとの差異を吸収して対象のDatabase向けの処理に書き換えてくれるからです。
これがdbtを使う魅力の一つになります。

dbtでupsertを実現する方法は、configにunique_keyを指定するだけです。

--- before
+++ after
@@ -1,5 +1,6 @@
 {{ config(
     materialized="incremental",
+    unique_key=['ymd'],
     schema="kpi"
 ) }}

こうすることで、上記のincrementalモデルはupsertによる更新になります。 実行時点ですでに対象の日付の行があれば、daily_active_users列を更新するという挙動になります。
1日に何回実行しても、ログが追記されてなければ最終的な結果は同じになるということになります。
こういう風な性質のことを冪等性といいます。incrementalモデルを使う場合は冪等性の確保を念頭に置くと運用しやすくなります。

日にちを開けて実行された場合

冪等性を確保したことで、一日に何回実行しても問題はなくなりました。 ところが、1日に1回実行される という暗黙の前提は、別の切り口でも問題をおこします。 それは、ある1日が何かしらの問題で、実行されなかった場合です。この場合、その実行されなかった日のデータが欠損してしまいます。 よくあるケースとして、ログの到着遅延、dbt実行のruntimeの問題、ログの出力先の変更、クラウドプロバイダー側の障害、etc...です。 実際に運用すると、毎日確実に実行されるという前提条件の担保が難しく感じることがあるでしょう。 dbtではモデルを工夫することでデータの欠損への対応も容易にすることができます。

データの欠損への一つの対応方法として、varを使った指定日の再処理という方法があります。 モデルを以下のように変更します。

--- before
+++ after
@@ -9,6 +9,12 @@
     ,count(distinct user_id) daily_active_users
 from {{ source('log', 'nginx_access_logs') }}
 {%- if is_incremental() %}
-where {{ dbt.date_trunc('day', '"time"') }} = {{ dbt.date_trunc('day', dbt.current_timestamp()) }}
+where
+    {%- set target_date = var('target_date','') %}
+    {%- if target_date != '' %}
+    {{ dbt.date_trunc('day', '"time"') }} = {{ dbt.safe_cast("'"~target_date~"'", api.Column.translate_type("date")) }}
+    {%- else %}
+    {{ dbt.date_trunc('day', '"time"') }} = {{ dbt.date_trunc('day', dbt.current_timestamp()) }}
+    {%- endif %}
 {%- endif %}
 group by 1

このモデルを使って、次のようなコマンドを実行することである特定の日だけ再処理を実行することが可能になります。

$ dbt build --select base__log_daily_active_users --var 'target_date: 2022-10-11'

この指定日の再処理の仕組みは、様々な状況のためのリカバリ手法として使うことができます。

また、何かしらが原因で失敗し続けて、あるときに復旧したという状況を想定したときに、 自動で最後の日から再計算してくれる機構があれば、運用上すごく便利になります。なぜなら、手動で再処理をする必要がないからです。
例えば次のように変更します。

--- before
+++ after
@@ -14,7 +14,15 @@
     {%- if target_date != '' %}
     {{ dbt.date_trunc('day', '"time"') }} = {{ dbt.safe_cast("'"~target_date~"'", api.Column.translate_type("date")) }}
     {%- else %}
-    {{ dbt.date_trunc('day', '"time"') }} = {{ dbt.date_trunc('day', dbt.current_timestamp()) }}
+        {%- set get_last_ymd_sql %}
+            select max(ymd) from {{ this }}
+        {%- endset %}
+        {%- set last_ymd = run_query(get_last_ymd_sql).columns[0].values() | first %}
+        {%- if last_ymd is not none %}
+            {{ dbt.date_trunc('day', '"time"') }} >= {{ dbt.safe_cast("'"~last_ymd~"'", api.Column.translate_type("date")) }}
+        {%- else %}
+            {{ dbt.date_trunc('day', '"time"') }} = {{ dbt.date_trunc('day', dbt.current_timestamp()) }}
+        {%- endif %}
     {%- endif %}
 {%- endif %}
 group by 1

上記の変更によって、2回目以降の実行で、target_date が与えられていない場合は既存のテーブルに対して

select max(ymd) from {{ this }}

を実行して、その結果を用いてSQLを描画・実行します。

モデルが複雑になって、分かりづらいとは思います。ですので、実際の描画されたSQLを見てみましょう。

select
    cast(date_trunc('day', "time") as date) as ymd
    ,count(distinct user_id) daily_active_users
from "prod"."source__log"."web_access_logs"
where date_trunc('day', "time") >= cast('2022-10-26' as date)
group by 1

このSQLは2022-10-26 以降の更新が停止していたと仮定した場合の描画結果です。 このようにすることで、ある程度の自動復旧が可能になります。

おわりに

本記事は、dbtのincrementalの利用方法の中で、基本から1歩踏み込んだ実運用的な話をしました。 実際に完成したincrementalモデルは以下のようになります。

{{ config(
    materialized="incremental",
    unique_key=['ymd'],
    schema="kpi"
) }}

select
    {{ dbt.safe_cast(dbt.date_trunc('day', '"time"'), api.Column.translate_type("date")) }} as ymd
    ,count(distinct user_id) daily_active_users
from {{ source('log', 'nginx_access_logs') }}
{%- if is_incremental() %}
where
    {%- set target_date = var('target_date','') %}
    {%- if target_date != '' %}
    {{ dbt.date_trunc('day', '"time"') }} = {{ dbt.safe_cast("'"~target_date~"'", api.Column.translate_type("date")) }}
    {%- else %}
        {%- set get_last_ymd_sql %}
            select max(ymd) from {{ this }}
        {%- endset %}
        {%- set last_ymd = run_query(get_last_ymd_sql).columns[0].values() | first %}
        {%- if last_ymd is not none %}
            {{ dbt.date_trunc('day', '"time"') }} >= {{ dbt.safe_cast("'"~last_ymd~"'", api.Column.translate_type("date")) }}
        {%- else %}
            {{ dbt.date_trunc('day', '"time"') }} = {{ dbt.date_trunc('day', dbt.current_timestamp()) }}
        {%- endif %}
    {%- endif %}
{%- endif %}
group by 1

上記のようなモデルを作成することで、冪等性の確保や障害時のリカバリが容易になります。 このようにdbtを活用することで複雑なログ集計の処理を、一つのSQLファイルとして表現し、 実際の蓄積しているデータの状況に合わせて複数のパターンのSQLを描画・実行できるようになりました。 ログの処理は、差分更新をしたくなるケースが良くあります。そのため、蓄積しているデータの状況に合わせて、複数パターンのSQLを描画・実行できるdbtは非常に強力な味方となります。 Jinjaテンプレート記法を多数使うことになるので、モデルの開発には慣れが必要ではありますが、ぜひ皆様も使ってみてください。

カヤックでは、継続的なデータの運用に興味があるエンジニアも募集しています。

hubspot.kayac.com

*1:ログのテーブルが肥大化して洗替では実行時間が長くなる等の理由で、前回からの変更分のみ計算して更新するということ

Terraform管理されたステージング環境・本番環境の差異を検出したくて頑張っている話

SREチームの橋本です。今回はステージング環境の運用でありがちな本番との差分に対処する試みを紹介します。

背景

ステージング環境について、例えばIT用語辞典では

ステージング環境とは、情報システムやソフトウェアの開発の最終段階で検証用に用意される、実際の運用環境と変わらない環境のこと。

と説明しています。検証用ですから、インフラ面で言っても本番環境となるべく一致した構成であってほしいということになります。

しかし実際にはさまざまな経緯(ステージング環境を後から立てたり!)から、たとえTerraform管理していたとしても差異が発生してしまうことがあります。

こうしたとき、その差異を検出する一つの方法としてはTerraformの.tfファイルを比較することですが、これにもいろいろな書き方がありえます。

例えばaws_db_proxy_endpointはterraform-provider-awsのv3.38.0で追加されましたが、それまではRoute 53でRDS proxyのエンドポイントを指定する際など自前で書いてあげる必要がありました。こうしたバージョンアップの対応が環境によってずれてしまうことは往々にして起こりえます。

そうしたTerraform側の変化がない場合でも、データソース(data)を使わずにARNを組み立てていたり、デフォルト値を明示したりしていなかったり、リソースを書く順番が違ったり……などチームで運用しているとどうしても細かい違いが生まれてしまうものです。

.tfファイルの文法はHCL(HashiCorp Configuration Language)と呼ばれる言語になっていて、リソースを書く順番だとかファイルの分け方だとかについてはHCLとして解釈して比較することで吸収できるかもしれません。 しかしHCLはかなりシンプルな言語で、.tfで当たり前のように使っているaws_route53_zone.foobar.nameといったリソースの持つ値の参照は実はHCLの機能ではないのです。それゆえHCLとして解釈するだけでは多くの表記揺れを吸収できず、Terraformが行っているリソース参照などを自力で再現する羽目になってしまいます。

そうした手間を省こうと思うと、.tfではなく実際の値が記録されているtfstateを比べることになります。

tfstateならそのまま比較できるわけではなくARNをTerraform上の名前に戻すなどの処理は必要ですが、.tfの高度な分析を行うよりは手間が少ないと考えられます。やはりterraform planと似たようなものを実装することになり、気が進まないという欠点はありますが。

ツールのイメージ

tfstateはファイルに出力できるのでこれを入力とします。また、後述しますがスキーマ情報も必要となるのでこれも追加して

> command_name schema.json left_tfstate.json right_tfstate.json

といった形で実行するCLIツールを考えます。出力としては

common resources: 100
with diff: 30
left only resources: 20
right only resources: 10

といった形で比較できたリソース、そのうち差分があったリソース、片方にしかなかったリソースの数を示すサマリー、そして

compare data.aws_ecs_service.foo
  /service_name : "foo" -> "foo-v2"

compare data.aws_ecs_service.bar
  /desired_count : 1 -> 2

……

といった個々の差分の表示があれば、とりあえず人間が読む分には十分と言えるでしょう。

実装

まずterraformコマンドにはshowというサブコマンドがあり、terraform show -jsonとすればJSON形式でtfstateが取得できます。

{
  "format_version": "1.0",
  "terraform_version": "1.2.9",
  "values": {
    "root_module": {
      "resources": [
        {
          "address": "aws_cloudwatch_log_group.app",
          "mode": "managed",
          "type": "aws_cloudwatch_log_group",
          "name": "app",
          "provider_name": "registry.terraform.io/hashicorp/aws",
          "schema_version": 0,
          "values": {
            "arn": "arn:aws:logs:ap-northeast-1:************:log-group:docker/app",
            "id": "docker/app",
            "kms_key_id": "",
            "name": "docker/app",
            "name_prefix": null,
            "retention_in_days": 365,
            "tags": {},
            "tags_all": {
              "ManagedBy": "terraform"
            }
          },
          "sensitive_values": {
            "tags": {},
            "tags_all": {}
          }
        },
        ……
      ]
    },
    "child_modules": {
      "resources": [
        ……
      ]
    }
  }
}

フォーマットもちゃんと公開されているのでこの出力をjsondiffで比較するのが基本的な方針となりますが、問題となるのがtfstate上はarguments(.tfで指定するもの)とattributes(自動的に決まり、他から参照される値)に区別がないことです。

例えば上に示したaws_cloudwatch_log_groupについて、arnはユーザーには決められないattributeですが、他のargumentsと一緒にvaluesに含まれています。

そこでもう一つ、追加の情報としてプロバイダー固有のスキーマ情報が必要になります。通常はあまり使わないサブコマンドなのですがterraform providers schemaというものがあり、これによってtfstateの要素ごとの性質が分かります。

{
  "format_version": "1.0",
  "provider_schemas": {
    "registry.terraform.io/hashicorp/aws": {
      "provider": {
        "version": 0,
        "block": {
          ……
        }
      },
      "resource_schemas": {
        "aws_cloudwatch_log_group": {
          "version": 0,
          "block": {
            "attributes": {
              "arn": {
                "type": "string",
                "description_kind": "plain",
                "computed": true
              },
              "id": {
                "type": "string",
                "description_kind": "plain",
                "optional": true,
                "computed": true
              },
              "kms_key_id": {
                "type": "string",
                "description_kind": "plain",
                "optional": true
              },
              "name": {
                "type": "string",
                "description_kind": "plain",
                "optional": true,
                "computed": true
              },
              "name_prefix": {
                "type": "string",
                "description_kind": "plain",
                "optional": true
              },
              "retention_in_days": {
                "type": "number",
                "description_kind": "plain",
                "optional": true
              },
              "tags": {
                "type": [
                  "map",
                  "string"
                ],
                "description_kind": "plain",
                "optional": true
              },
              "tags_all": {
                "type": [
                  "map",
                  "string"
                ],
                "description_kind": "plain",
                "optional": true,
                "computed": true
              }
            },
            "description_kind": "plain"
          }
        },
        ……
      },
      "data_source_schemas": {
        ……
      }
    }
  }
}

中でも特にcomputedoptionalが重要で、

  • computedでない→必ず指定する必要がある
  • optionalである→人間が指定する(こともある)

のどちらかを満たせばargumentsと判定することができます。 (optionalが真だと指定がない場合に自動で値が入るのでcomputedは真になっています。)

スキーマについてもaws_cloudwatch_log_groupを見ると、例えばarncomputedでありoptionalではないことからattributeと分かります。 ただしidは人間が指定しない、というかドキュメントにも書かれていないものですが、例外的にargumentと同じ性質になっているようです。

この他にも以下のような処理を行い、なるべく人間に優しい差分を出力するように努めています。

  • ARNなどの識別子をリソース名に変換する
  • スキーマを見てset(順序がない)の場合はソートする
  • jsondiffの出力からstg-(ステージング)とprod-(プロダクション=本番)のような差分を無視する
  • ポリシー類のjsonを再帰的に比較する

実際の環境で実行してみる

実際にあるプロジェクトのtfstateでステージングと本番を比較してみたところ、サマリーとしては以下のようになりました。

common resources: 202
with diff: 98
left only resources: 234
right only resources: 317

leftがステージング、rightが本番です。left/right onlyの比較できなかったリソースがかなりありますが、中身を見るとステージング側でfor_eachにより整理した部分で、リソース名が食い違って比較対象が見つからなかったようです。(このステージング環境も後追いで構築された部分があるので、その構築の際に整理したものですね。)

比較対象を自力で探すのは大変なので、リソースの命名については一致しているのを期待したいところですが、この程度は自動で検知した方が良いかもしれません。

検知された差分の中身を見ていくと以下のようなものが見られました。

compare aws_lb.main
  /access_logs/0/bucket : "logs-********-stg" -> "logs.********"

これはバケット名に.を使わないことが推奨されているのを受けてステージングで命名規則を変更したようです。

compare aws_iam_role.lambda_sqs2worker
  compare /assume_role_policy:
    /Statement/0/Sid : "" -> null
  /managed_policy_arns/1 : "aws_iam_policy.logs_wo" -> "aws_iam_policy.lambda_function"
  /managed_policy_arns/2 : "aws_iam_policy.sqs_rw" -> "aws_iam_policy.logs_wo"
  /managed_policy_arns/3 : "aws_iam_policy.ssm_params_ro" -> "aws_iam_policy.sqs_rw"
  /managed_policy_arns/4 : "aws_iam_policy.vpc_access" -> "aws_iam_policy.ssm_param_ro"
  /managed_policy_arns/- : null -> "aws_iam_policy.vpc_access"

表示がちょっと分かりにくいですがaws_iam_policy.lambda_functionという汎用っぽいポリシーがステージングでは消えています。 これもステージング構築の際に権限を整理したものと思われます。

compare aws_elasticache_replication_group.main
  /description : "main replication group for stg" -> "prod replication group"
  /engine_version : "5.0.6" -> "3.2.10"
  /node_type : "cache.t4g.small" -> "cache.r4.large"
  /replication_group_description : "main replication group for stg" -> "prod replication group"
  /timeouts : null -> {"create":null,"delete":null,"update":null}

本質的な差はエンジンバージョンとノードタイプですね。 バージョン3系は2023年7月でEoLなので、その前にアップデートして本番側もバージョンが上がる予定となっています…。

他にもIPアドレスだとか内部的なIDだとかの違いから差分が出たり、"s3:Get*"["s3:Get*"]のような細かい差分が出たりも見られました。 こういう部分も自動で無視したり、無視する設定をデフォルトのconfigで用意したりしたいですね。

まとめ

まだPoC的な段階ではありますが、tfstateの比較によってステージング環境を本番と相同に保つ試みについて書かせて頂きました。

現状では今あるものを比較する限りですが、将来的には「PRができた段階でterraform planの結果を評価する」などの取り組みにも挑戦したいところです。具体的にはサマリー部分の数値をterraform plan前後で比べることで、そのPRによってステージングと本番が「より近づいたか」「より離れたか」を定量的に評価することが可能となります。 また例えばステージングに先に反映した内容を本番にも入れようとしたのに差分が減らなかったら、何か一方に設定ミスがあるということになります。

ミスがないか確かめる、というのは明確なゴールがないので果てがなく、心理的にもコストが高くなりがちなタスクです。そうした部分こそ自動化を図ることで運用しやすい環境、運用しやすいプロダクトを目指したいと考えています。

カヤックでは、信頼できる開発環境運用に興味があるエンジニアも募集しています。

hubspot.kayac.com