この記事はdbt Advent Calendar 2023の5日目です。
こんにちは、その他事業部SREチーム所属の@mashiikeです。
カヤックは様々な事業・プロジェクトを展開しておりますが、その一つとして『北欧、暮らしの道具店』を運営する株式会社クラシコムとの協業プロジェクトがあります。 www.kayac.com
こちらのプロジェクトでは2019年より継続して、クラシコム様のデータ基盤の構築・運用のサポートの一部を行っております。
その中で、troccoのdbt連携機能を用いて、データの変換を実装しております。1
今回の記事は、同プロジェクトの中で行われた一風変わったdbtの活用例の紹介になります。
内容の関係上、予めLookerの用語と概念を知っていると読みやすいと思います。
cloud.google.com
背景
クラシコム様のデータ分析基盤では、ビジネスインテリジェンスにLookerを採用しており、Lookerを用いたレポートが多数存在しています。 2022年10月頃まで、データマートはLookMLのPersistent Derived Tables(PDT) を用いて実装されることが多くありました。 どのようにしてデータマートの実装をしていたかというと、Looker Forumsに投稿された『How to: Simple incremental PDTs using create_process』を応用して作っていました。 (2019年12月頃に日本人コミュニティで翻訳されたものがこちらになります。) より具体的には、次のようなLookMLによるETLパイプラインが多く存在していることになります。
view: extractor { sql_table_name: log.xxx filter: target_date { type: date default_value: "before yesterday" } dimension: event_date {} dimension: user_id {} mesure uu { type: count_distinct sql: ${user_id} } } explore source { sql_always_where: {% condition target_date %}event_date{% endcondition %} } view transformer__all { derived_table: { explore_source: extractor { filters: [ extractor.target_date: "before yesterday", ] timezone: Asia/Tokyo column: event_date {} column: uu {} } } } view transformer__latest { derived_table: { explore_source: extractor { filters: [ extractor.target_date: "after 5 days ago", ] timezone: Asia/Tokyo column: event_date {} column: uu {} } } } view loader { derived_table: { create_process: { sql_step: CREATE TABLE IF NOT EXISTS datamarts.xxxx PARTITION BY date(event_date) AS SELECT * FROM ${transformer__all.SQL_TABLE_NAME} ;; sql_step: INSERT INTO datamarts.xxxx SELECT * FROM ${transformer__latest.SQL_TABLE_NAME} ;; sql_step: CREATE VIEW IF NOT EXISTS ${SQL_TABLE_NAME} AS SELECT * FROM datamarts.xxxx ;; } datagroup_trigger: daily } } explore loader {}
上記のようなLookMLを用いたETLが多数存在していたのですが、管理・運用が困難になってきたので2022年10月頃よりtroccoを用いたdbtによるETLパイプラインの移行プロジェクトが始まりました。 ちなみに、上記のLookMLをdbtのテンプレートSQLに変換すると、大体こんな感じになると思います。
{{ config( materialized='incremental', on_schema_change="append_new_columns", incremental_strategy="insert_overwrite", unique_key=['event_date'], partition_by={ 'field': 'event_date', 'data_type': 'date', 'granularity': 'day', }, ) }} SELECT event_date, COUNT(distinct user_id) as uu FROM {{ source('log','xxx') }} {%- if is_incremental() %} WHERE event_date >= {{ dbt_utils.dateadd(day, -5, 'CURRENT_DATE("Asia/Tokyo")') }} {%- endif %} GROUP BY 1
dbtで書くとだいぶ簡潔になりますね。
上記の例では、データ変換のロジックが簡単ですが、実際にはとても複雑なデータ変換が行われています。
複雑なデータ変換をLookMLのderived_table
から、dbtのテンプレートSQLに移行することで、LookMLを簡潔にする
ということが、この移行プロジェクトの目的の一つです。
さて、ここまでの背景を踏まえて、今回の記事の本題に入ります。
BigQueryから HTTP Status 413 Request Entity Too Large が返ってきた。
移行を進めていく中で、あるデータマートにて厄介な問題に遭遇しました。
- あるレポート用のデータマートが、普段の探索分析用のExplore2を参照していた。
- 探索分析用のExploreは9種類のパラメータを持っている複雑なderived_tableで構成されていた。
- 9種類のパラメータのいくつかの組み合わせをレポートとして出力していた。
レポート用のデータマートだけを移行しても良いのですが、LookMLを簡潔にする
という目的もあるので、
探索分析用のExploreで使っている複雑なderived_table
もdbtを使って予めデータマート化しておきくなりました。
そこで、探索分析用のデータマートを作ろうとするのですが、9種類のパラメータというのが厄介になります。
ここで言うところのパラメータとは、次のようなものです。
- 集計時間枠(timeframe)の設定: daily, weekly, monthly, quarterly,yearly
- 集計プラットフォームの設定: 全部まとめて, App/Web別, iOS/Android/Web
- 流入元の集計についての設定: 全部まとめて, 種類ごとに, メディアごとに, キャンペーンごとに, ...
このように、どういう粒度で集計するかというパラメータが存在しており、すべての組み合わせは約4万通りになりました。 この4万通り全てを見ているわけではないですが、必要なものだけ絞るというのも大変なため、一旦dbtによるテンプレートSQLを用いて生成してみました。
具体的には以下のようなSQLになります。
{{ config( materialized='incremental', on_schema_change="append_new_columns", incremental_strategy="insert_overwrite", unique_key=['event_date'], partition_by={ 'field': 'ymd', 'data_type': 'date', 'granularity': 'day', }, ) }} with source as ( /* 元データの抽出 */ ), {%- set cte_names = [] %} {%- for agg_timeframe in ['daily', 'weekly', 'monthly', 'quarterly', 'yearly'] %} {%- for agg_platform in ['all', 'webapp', 'individually'] %} {%- for agg_traffic in ['all', 'type', 'medium', 'campaign',...] %} /* その他の集計条件もいっぱい。 */ {%- set cte_name = 'cte__'~agg_timeframe~'__'~agg_platform~'__'~agg_traffic %} {%- set _ = cte_names.append(cte_name) %} {{ cte_name }} as ( /* 集計条件ごとの集計 */ /* {%- if agg_timeframe == 'daily' %} ... {% endif %} のようにして描画を切り替え */ ) {%- endfor %} {%- endfor %} {%- endfor %} unioned as ( {%- for cte_name in cte_names %} select * from {{ cte_name }} {% if not loop.last %} union all {%- endif %} {%- endfor %} ) select * from unioned
実行してみると、当然のごとく実行エラーとなりました。
07:48:52 Completed with 1 error and 0 warnings: 07:48:52 07:48:52 Runtime Error in model xxx (models/datamarts/reports/xxx.sql) 07:48:52 413 POST https://bigquery.googleapis.com/bigquery/v2/projects/xxxxxxxxxxxxx/jobs?prettyPrint=false: <!DOCTYPE html> <中略> 07:48:52 07:48:52 07:48:52 Location: US 07:48:52 Job ID: 00000000-0000-0000-0000-000000000000 07:48:52 07:48:52 07:48:52 Done. PASS=0 WARN=0 ERROR=1 SKIP=0 TOTAL=1
BigQueryから HTTP Status 413 Request Entity Too Large が返ってきたというエラーです。 dbtでCompileしたSQLが長すぎたのです。
2段階テンプレート戦法
カヤックと同じくサポートで入っている風音屋3様と、『どうするのがいいのか?』という議論を、重ねました。 長期的な始点では、探索分析用のExploreを細分化して、パラメータを減らすことが望ましいという結論になりました。 しかし、それはそれで時間がかかるので、短期的にはすべてのパターンをdbtにより生成したいという結論にもなりました。
そこで、どうにかしてデータマートをする手段を考えました。それが、2段階テンプレート戦法です。
今回の問題は、dbtが描画するテンプレートSQLをCompileすると1つのSQLが長くなりすぎるというところが本質です。 つまり、dbtのモデルを分割して、1つのSQLを短くした後にすべてをUNIONしたviewやtableを生成すれば良いということになります。 BigQueryのドキュメントによれば、『未解決 Google SQL クエリの最大長 1MB』ということなので、dbtによってCompileしたSQLが1MBを超えないようにモデルを分割すれば良いということになります。
https://cloud.google.com/bigquery/quotas?hl=ja
そして、その数を試算してみると、約300モデルということがわかりました。 『よし、300個の似たようなSQLを筋肉を使って、書くぞおおおおおお!』とはなりません。プログラムで生成しましょう。 以下のようなpythonプログラムを用意しました。(一部抜粋)
import os import click import logging from jinja2 import Environment, FileSystemLoader from .recipe import load_recipe, generate_params_combinations from sqlfmt.api import Mode, format_string import sqlfluff from sqlfluff.cli.commands import fix def raise_helper(msg): raise Exception(msg) @click.command() @click.option('--log-level', default='INFO') @click.option('--dry-run', is_flag=True) @click.argument('recipe') def cli(log_level, dry_run, recipe): """Generate models from a template and json context file""" logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) logger.setLevel(log_level) base_path = os.path.dirname(recipe) recipe_data = load_recipe(recipe) template = recipe_data['template'] destination = recipe_data['destination'] file_pattern = recipe_data['file_pattern'] outputs = recipe_data.get('outputs', []) # defaultのblock_start_stringやblock_end_stringはdbtのものと被るので変更しておく。 env = Environment( loader=FileSystemLoader(base_path), block_start_string='<%', block_end_string='%>', variable_start_string='<<', variable_end_string='>>', comment_start_string='<#', comment_end_string='#>', ) env.globals['raise'] = raise_helper template = env.get_template(template) if not os.path.isabs(destination): abs_destination = os.path.abspath(os.path.join(base_path, destination)) else: abs_destination = os.path.abspath(relative_path) if not os.path.exists(destination): os.makedirs(destination) mode = Mode({'dialect': 'bigquery'}) params_combinations = generate_params_combinations(recipe_data) if dry_run: logger.warning('Dry run start.') logger.info(f'Found {len(params_combinations)} combinations') for params in params_combinations: file = file_pattern.format(**params) renderd = template.render(params) formatted_query = format_string(renderd, mode) abs_output_path = os.path.join(abs_destination, file) output_path = os.path.join(destination, file) logger.info(f'Writing {output_path} with params {params}') if dry_run: continue output_dir = os.path.dirname(abs_output_path) if not os.path.exists(output_dir): os.makedirs(output_dir) with open(abs_output_path, 'w') as f: f.write('{#\n-- This file is auto-generated. Do not edit. \n') f.write(f'-- command is `python -m src.trocco_dbt.tools.genmodel {recipe}`\n-- generate by {params}\n') f.write('#}\n') f.write(renderd) f.write('\n') if dry_run: logger.warning('Dry ended. No files were written') if __name__ == '__main__': cli()
このようなプログラムに以下のJSONを与えて
{ "template": "xxx_tpl.sql", "destination": "../models/datamarts/reports/intermediate/xxx/", "file_pattern": "{timeframe}_xxx_{platform}.sql", "params": { "timeframe": ["daily", "weekly", "monthly", "yearly", "quarterly"], "platform": ["all", "individually", "appweb"], } }
次のようなテンプレートSQLを書きます。
{{ config( materialized='incremental', on_schema_change="append_new_columns", incremental_strategy="insert_overwrite", unique_key=['event_date'], partition_by={ 'field': 'ymd', 'data_type': 'date', 'granularity': 'day', }, ) }} with source as ( /* 元データの抽出 */ ), {%- set cte_names = [] %} {%- for agg_traffic in ['all', 'type', 'medium', 'campaign',...] %} /* その他の集計条件もいっぱい。 */ {%- set cte_name = 'cte__'~agg_traffic %} {%- set _ = cte_names.append(cte_name) %} {{ cte_name }} as ( /* 集計条件ごとの集計 */ /* <%- if agg_timeframe == 'daily' %> ... <% endif %> のようにして描画を切り替え: ここが変わる */ ) {%- endfor %} unioned as ( {%- for cte_name in cte_names %} select * from {{ cte_name }} {% if not loop.last %} union all {%- endif %} {%- endfor %} ) select * from unioned
先程のプログラムすることで、jsonの内容に従って大量のSQLが生成されることになります。
(.venv) $ files=(${(f)"$(find . -type f)"}) declare -A file_count_by_prefix for file in $files; do prefix=${file%%_*} ((file_count_by_prefix[$prefix]++)) done for prefix count in ${(kv)file_count_by_prefix}; do echo "Prefix: $prefix, Count: $count" done Prefix: ./daily, Count: 60 Prefix: ./quarterly, Count: 60 Prefix: ./weekly, Count: 60 Prefix: ./monthly, Count: 60 Prefix: ./yearly, Count: 60
300個のdbt用のテンプレートSQLがJinja2テンプレートからプログラムを使って生成されました。 これでCompileしたSQLが1MBを超えることはなくなりました。スマート(物理)に解決できました。
まとめ
カヤックでは、株式会社クラシコムとの協業プロジェクトでデータ基盤の構築・運用を行っています。 その中で、Lookerによって生成されてたデータマートをdbtによる生成に移行するプロジェクトを行っています。 移行プロジェクトの中には、dbtのテンプレートSQLをCompileした結果が長すぎてBigQueryがエラーを返すこともありましたが、 Jinja2テンプレートを使って、dbtのテンプレートSQLを多数生成することでスマート(物理)で解決する事例でした。 今回のような一風変わったdbtの活用例以外でも、一般的なdbt活用が進んでいます。
株式会社クラシコムでは現在、Webエンジニアを募集中です。興味のある方はお気軽にお問い合わせください。