dbtのテンプレートSQLをJinja2テンプレートで大量生成する話 〜クラシコム様での事例〜

この記事はdbt Advent Calendar 2023の5日目です。
こんにちは、その他事業部SREチーム所属の@mashiikeです。

カヤックは様々な事業・プロジェクトを展開しておりますが、その一つとして『北欧、暮らしの道具店』を運営する株式会社クラシコムとの協業プロジェクトがあります。 www.kayac.com

こちらのプロジェクトでは2019年より継続して、クラシコム様のデータ基盤の構築・運用のサポートの一部を行っております。 その中で、troccoのdbt連携機能を用いて、データの変換を実装しております。1
今回の記事は、同プロジェクトの中で行われた一風変わったdbtの活用例の紹介になります。 内容の関係上、予めLookerの用語と概念を知っていると読みやすいと思います。 cloud.google.com

背景

クラシコム様のデータ分析基盤では、ビジネスインテリジェンスにLookerを採用しており、Lookerを用いたレポートが多数存在しています。 2022年10月頃まで、データマートはLookMLのPersistent Derived Tables(PDT) を用いて実装されることが多くありました。 どのようにしてデータマートの実装をしていたかというと、Looker Forumsに投稿された『How to: Simple incremental PDTs using create_process』を応用して作っていました。 (2019年12月頃に日本人コミュニティで翻訳されたものがこちらになります。) より具体的には、次のようなLookMLによるETLパイプラインが多く存在していることになります。

view: extractor {
  sql_table_name: log.xxx
  filter: target_date {
    type: date
    default_value: "before yesterday"
  }
  dimension: event_date {}
  dimension: user_id {}
  mesure uu {
    type: count_distinct
    sql: ${user_id}
   }
}

explore source {
  sql_always_where: {% condition target_date %}event_date{% endcondition %}
}

view transformer__all {
  derived_table: {
    explore_source: extractor {
      filters: [
        extractor.target_date: "before yesterday",
      ]
      timezone: Asia/Tokyo
      column: event_date {}
      column: uu {}
    }
  }
}

view transformer__latest {
  derived_table: {
    explore_source: extractor {
      filters: [
        extractor.target_date: "after 5 days ago",
      ]
      timezone: Asia/Tokyo
      column: event_date {}
      column: uu {}
    }
  }
}

view loader {
  derived_table: {
    create_process: {
      sql_step:
        CREATE TABLE IF NOT EXISTS datamarts.xxxx
        PARTITION BY date(event_date)
        AS SELECT *
        FROM ${transformer__all.SQL_TABLE_NAME}
      ;;
      sql_step:
        INSERT INTO datamarts.xxxx
        SELECT *
        FROM ${transformer__latest.SQL_TABLE_NAME}
      ;;
      sql_step:
        CREATE VIEW IF NOT EXISTS ${SQL_TABLE_NAME}
        AS SELECT * FROM datamarts.xxxx
      ;;
    }
    datagroup_trigger: daily
  }
}

explore loader {}

上記のようなLookMLを用いたETLが多数存在していたのですが、管理・運用が困難になってきたので2022年10月頃よりtroccoを用いたdbtによるETLパイプラインの移行プロジェクトが始まりました。 ちなみに、上記のLookMLをdbtのテンプレートSQLに変換すると、大体こんな感じになると思います。

{{
    config(
        materialized='incremental',
        on_schema_change="append_new_columns",
        incremental_strategy="insert_overwrite",
        unique_key=['event_date'],
        partition_by={
            'field': 'event_date',
            'data_type': 'date',
            'granularity': 'day',
        },
    )
}}
SELECT 
    event_date,
    COUNT(distinct user_id) as uu
FROM {{ source('log','xxx') }}
{%- if is_incremental() %}
WHERE event_date >= {{ dbt_utils.dateadd(day, -5, 'CURRENT_DATE("Asia/Tokyo")') }}
{%- endif %}
GROUP BY 1

dbtで書くとだいぶ簡潔になりますね。
上記の例では、データ変換のロジックが簡単ですが、実際にはとても複雑なデータ変換が行われています。 複雑なデータ変換をLookMLのderived_tableから、dbtのテンプレートSQLに移行することで、LookMLを簡潔にするということが、この移行プロジェクトの目的の一つです。 さて、ここまでの背景を踏まえて、今回の記事の本題に入ります。

BigQueryから HTTP Status 413 Request Entity Too Large が返ってきた。

移行を進めていく中で、あるデータマートにて厄介な問題に遭遇しました。

  • あるレポート用のデータマートが、普段の探索分析用のExplore2を参照していた。
  • 探索分析用のExploreは9種類のパラメータを持っている複雑なderived_tableで構成されていた。
  • 9種類のパラメータのいくつかの組み合わせをレポートとして出力していた。

レポート用のデータマートだけを移行しても良いのですが、LookMLを簡潔にするという目的もあるので、 探索分析用のExploreで使っている複雑なderived_tableもdbtを使って予めデータマート化しておきくなりました。

そこで、探索分析用のデータマートを作ろうとするのですが、9種類のパラメータというのが厄介になります。  

ここで言うところのパラメータとは、次のようなものです。

  • 集計時間枠(timeframe)の設定: daily, weekly, monthly, quarterly,yearly
  • 集計プラットフォームの設定: 全部まとめて, App/Web別, iOS/Android/Web
  • 流入元の集計についての設定: 全部まとめて, 種類ごとに, メディアごとに, キャンペーンごとに, ...

このように、どういう粒度で集計するかというパラメータが存在しており、すべての組み合わせは約4万通りになりました。 この4万通り全てを見ているわけではないですが、必要なものだけ絞るというのも大変なため、一旦dbtによるテンプレートSQLを用いて生成してみました。

具体的には以下のようなSQLになります。

{{
    config(
        materialized='incremental',
        on_schema_change="append_new_columns",
        incremental_strategy="insert_overwrite",
        unique_key=['event_date'],
        partition_by={
            'field': 'ymd',
            'data_type': 'date',
            'granularity': 'day',
        },
    )
}}

with source as (
    /* 元データの抽出 */
), 
{%- set cte_names = [] %}
{%- for agg_timeframe in ['daily', 'weekly', 'monthly', 'quarterly', 'yearly'] %}
{%- for agg_platform in ['all', 'webapp', 'individually'] %}
{%- for agg_traffic in ['all', 'type', 'medium', 'campaign',...] %}
/* その他の集計条件もいっぱい。 */
{%- set cte_name = 'cte__'~agg_timeframe~'__'~agg_platform~'__'~agg_traffic %}
{%- set _ = cte_names.append(cte_name) %}
{{ cte_name }} as (
    /* 集計条件ごとの集計 */
    /* {%- if agg_timeframe == 'daily' %} ... {% endif %}  のようにして描画を切り替え */
)
{%- endfor %}
{%- endfor %}
{%- endfor %}

unioned as (
{%- for cte_name in cte_names %}
select * from {{ cte_name }}
{% if not loop.last %} union all {%- endif %}
{%- endfor %}
)

select * from unioned 

実行してみると、当然のごとく実行エラーとなりました。

07:48:52  Completed with 1 error and 0 warnings:
07:48:52  
07:48:52  Runtime Error in model xxx (models/datamarts/reports/xxx.sql)
07:48:52    413 POST https://bigquery.googleapis.com/bigquery/v2/projects/xxxxxxxxxxxxx/jobs?prettyPrint=false: <!DOCTYPE html>

<中略>

07:48:52    
07:48:52    
07:48:52    Location: US
07:48:52    Job ID: 00000000-0000-0000-0000-000000000000
07:48:52    
07:48:52  
07:48:52  Done. PASS=0 WARN=0 ERROR=1 SKIP=0 TOTAL=1

BigQueryから HTTP Status 413 Request Entity Too Large が返ってきたというエラーです。 dbtでCompileしたSQLが長すぎたのです。

2段階テンプレート戦法

カヤックと同じくサポートで入っている風音屋3様と、『どうするのがいいのか?』という議論を、重ねました。 長期的な始点では、探索分析用のExploreを細分化して、パラメータを減らすことが望ましいという結論になりました。 しかし、それはそれで時間がかかるので、短期的にはすべてのパターンをdbtにより生成したいという結論にもなりました。

そこで、どうにかしてデータマートをする手段を考えました。それが、2段階テンプレート戦法です。

今回の問題は、dbtが描画するテンプレートSQLをCompileすると1つのSQLが長くなりすぎるというところが本質です。 つまり、dbtのモデルを分割して、1つのSQLを短くした後にすべてをUNIONしたviewやtableを生成すれば良いということになります。 BigQueryのドキュメントによれば、『未解決 Google SQL クエリの最大長 1MB』ということなので、dbtによってCompileしたSQLが1MBを超えないようにモデルを分割すれば良いということになります。

https://cloud.google.com/bigquery/quotas?hl=ja

そして、その数を試算してみると、約300モデルということがわかりました。 『よし、300個の似たようなSQLを筋肉を使って、書くぞおおおおおお!』とはなりません。プログラムで生成しましょう。 以下のようなpythonプログラムを用意しました。(一部抜粋)

import os
import click
import logging
from jinja2 import Environment, FileSystemLoader
from .recipe import load_recipe, generate_params_combinations
from sqlfmt.api import Mode, format_string
import sqlfluff
from sqlfluff.cli.commands import fix

def raise_helper(msg):
    raise Exception(msg)

@click.command()
@click.option('--log-level', default='INFO')
@click.option('--dry-run', is_flag=True)
@click.argument('recipe')
def cli(log_level, dry_run, recipe):
    """Generate models from a template and json context file"""

    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    logger.setLevel(log_level)

    base_path = os.path.dirname(recipe)
    recipe_data = load_recipe(recipe)
    template = recipe_data['template']
    destination = recipe_data['destination']
    file_pattern = recipe_data['file_pattern']
    outputs = recipe_data.get('outputs', [])

    # defaultのblock_start_stringやblock_end_stringはdbtのものと被るので変更しておく。
    env = Environment(
        loader=FileSystemLoader(base_path),
        block_start_string='<%',
        block_end_string='%>',
        variable_start_string='<<',
        variable_end_string='>>',
        comment_start_string='<#',
        comment_end_string='#>',
    )
    env.globals['raise'] = raise_helper
    template = env.get_template(template)

    if not os.path.isabs(destination):
        abs_destination = os.path.abspath(os.path.join(base_path, destination))
    else:
        abs_destination = os.path.abspath(relative_path)

    if not os.path.exists(destination):
        os.makedirs(destination)

    mode = Mode({'dialect': 'bigquery'})
    params_combinations = generate_params_combinations(recipe_data)
    if dry_run:
        logger.warning('Dry run start.')
    logger.info(f'Found {len(params_combinations)} combinations')
    for params in params_combinations:
        file = file_pattern.format(**params)
        renderd = template.render(params)
        formatted_query = format_string(renderd, mode)
        abs_output_path = os.path.join(abs_destination, file)
        output_path = os.path.join(destination, file)
        logger.info(f'Writing {output_path} with params {params}')
        if dry_run:
            continue
        output_dir = os.path.dirname(abs_output_path)
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)

        with open(abs_output_path, 'w') as f:
            f.write('{#\n-- This file is auto-generated. Do not edit. \n')
            f.write(f'-- command is `python -m src.trocco_dbt.tools.genmodel {recipe}`\n-- generate by {params}\n')
            f.write('#}\n')
            f.write(renderd)
            f.write('\n')

    if dry_run:
        logger.warning('Dry ended. No files were written')

if __name__ == '__main__':
    cli()

このようなプログラムに以下のJSONを与えて

{
  "template": "xxx_tpl.sql",
  "destination": "../models/datamarts/reports/intermediate/xxx/",
  "file_pattern": "{timeframe}_xxx_{platform}.sql",
  "params": {
    "timeframe": ["daily", "weekly", "monthly", "yearly", "quarterly"],
    "platform": ["all", "individually", "appweb"],
  }
}

次のようなテンプレートSQLを書きます。

{{
    config(
        materialized='incremental',
        on_schema_change="append_new_columns",
        incremental_strategy="insert_overwrite",
        unique_key=['event_date'],
        partition_by={
            'field': 'ymd',
            'data_type': 'date',
            'granularity': 'day',
        },
    )
}}

with source as (
    /* 元データの抽出 */
), 
{%- set cte_names = [] %}
{%- for agg_traffic in ['all', 'type', 'medium', 'campaign',...] %}
/* その他の集計条件もいっぱい。 */
{%- set cte_name = 'cte__'~agg_traffic %}
{%- set _ = cte_names.append(cte_name) %}
{{ cte_name }} as (
    /* 集計条件ごとの集計 */
    /* <%- if agg_timeframe == 'daily' %> ... <% endif %>  のようにして描画を切り替え: ここが変わる */
)
{%- endfor %}

unioned as (
{%- for cte_name in cte_names %}
select * from {{ cte_name }}
{% if not loop.last %} union all {%- endif %}
{%- endfor %}
)

select * from unioned 

先程のプログラムすることで、jsonの内容に従って大量のSQLが生成されることになります。

(.venv) $ files=(${(f)"$(find . -type f)"})
declare -A file_count_by_prefix
for file in $files; do
    prefix=${file%%_*}
    ((file_count_by_prefix[$prefix]++))
done

for prefix count in ${(kv)file_count_by_prefix}; do
    echo "Prefix: $prefix, Count: $count"
done
Prefix: ./daily, Count: 60
Prefix: ./quarterly, Count: 60
Prefix: ./weekly, Count: 60
Prefix: ./monthly, Count: 60
Prefix: ./yearly, Count: 60

300個のdbt用のテンプレートSQLがJinja2テンプレートからプログラムを使って生成されました。 これでCompileしたSQLが1MBを超えることはなくなりました。スマート(物理)に解決できました。

まとめ

カヤックでは、株式会社クラシコムとの協業プロジェクトでデータ基盤の構築・運用を行っています。 その中で、Lookerによって生成されてたデータマートをdbtによる生成に移行するプロジェクトを行っています。 移行プロジェクトの中には、dbtのテンプレートSQLをCompileした結果が長すぎてBigQueryがエラーを返すこともありましたが、 Jinja2テンプレートを使って、dbtのテンプレートSQLを多数生成することでスマート(物理)で解決する事例でした。 今回のような一風変わったdbtの活用例以外でも、一般的なdbt活用が進んでいます。

株式会社クラシコムでは現在、Webエンジニアを募集中です。興味のある方はお気軽にお問い合わせください。

面白法人カヤックでは枠にはまらないツールの使い方を考えるエンジニアも募集しています!

hubspot.kayac.com