この記事はdbt Advent Calendar 2024および、Kayac Group Advent Calender 2024の4日目の記事になります。
2024/10/30にAWS からAmazon Redshiftの素晴らしいUpdateが発表されました。 aws.amazon.com
自動コピーを使用して Amazon S3 から Amazon Redshift へのデータ取り込みです。
Redshift では、COPY コマンドを使用してデータを Amazon S3 から 効率的にデータをロードできますが、
このUpdateが来るまでは継続的な取り込みは Amazon Data Firehose を使用する方法が一般的でした。
FirehoseがRedshiftに対応する前から存在する、fujiwara-wareの一つである Rinが続投するケースが多くありました。
(この機能に関する詳しい内容は、冒頭のAWSのブログを参照していただければ幸いです。 )
queue_name: my_queue_name # SQS queue name credentials: aws_region: ap-northeast-1 redshift: host: default.123456789012.ap-northeast-1.redshift-serverless.amazonaws.com port: 5439 dbname: dev user: rin password: '{{ must_env "REDSHIFT_PASSWORD" }}' schema: public reconnect_on_error: true s3: bucket: test.bucket.test region: ap-northeast-1 sql_option: "JSON 'auto'" targets: - s3: key_prefix: s3_auto_copy_test/ - redshift: table: peoples
Firehoseを用いる場合は、TeraformのようなIaC(Infra as a Code) を使用して管理することが多かったです。
ですので、今回の機能Updateで追加された、S3の自動コピーをする「Copy Job」もコード的に管理したいという気持ちは当然のごとく湧いてくるのです。
dbtは、データ変換を行うツールで、SQLを書くだけでデータ変換を行うことができます。 このdbtはmacroという形で様々な機能拡張ができ、外部テーブルの管理をするdbt-external-tablesというものも存在します。
「外部テーブルをdbtで管理できるんだから、Copy Jobも管理できるんじゃないか!」
このマクロの中身の詳細については割愛*1 するとして、使い方のイメージとしては以下のようになります。
version: 2 sources: - name: s3_sources schema: public loader: s3_copy_job # ここをみて対象かどうかを決めている。 description: 'Load with Redshift S3 event integrations' config: iam_role_arn: arn:aws:iam::123456789012:role/RedshiftRole is_auto: "{{ var('s3_auto_copy_is_auto', 'True') | as_bool }}" copy_parameters: > FORMAT AS JSON 'auto' tables: - name: peoples description: "This is a test table for s3 auto copy" config: location: 's3://test.bucket.test/s3_auto_copy_test/'
sourcesの下のloaderというところが s3_copy_job
となっているsourcesを対象に、configに格納された情報をつかってCopy Jobを作成します。
使い方はdbtを使い慣れてる人なら、察していると思いますが、macrosのディレクトリ配下にマクロを配置して、sources.ymlに記述し、dbt_project.ymlのon-run-start hookで呼び出す想定です。
name: 'hoge' version: '1.0.0' model-paths: ["models"] analysis-paths: ["analyses"] test-paths: ["tests"] seed-paths: ["seeds"] macro-paths: ["macros"] snapshot-paths: ["snapshots"] profile: 'dev' models: hoge: +materialized: view #中略 ## これを追記。 on-run-start: - "{{ manage_s3_copy_job() }}"
$ dbt build 07:26:06 Running with dbt=1.8.9 07:26:06 Registered adapter: redshift=1.8.1 07:26:06 Found 2 models, 1 operation, 4 data tests, 1 source, 496 macros 07:26:06 07:26:07 07:26:07 Running 1 on-run-start hook 07:26:07 ====== manage s3 copy jobs ============ 07:26:07 1 of 1 START public.peopoles from s3://test.bucket.test/s3_auto_copy_test/ 07:26:08 1 of 1 OK public.peopoles from s3://test.bucket.test/s3_auto_copy_test/[CREATE COPY JOB SUCCESS in dbt__public__peopoles] 07:26:08 ======================================= 07:26:08 1 of 1 START hook: s3_auto_copy.on-run-start.0 ................................. [RUN] 07:26:08 1 of 1 OK hook: s3_auto_copy.on-run-start.0 .................................... [OK in 0.00s] 07:26:08 07:26:08 Concurrency: 15 threads (target='dev') 07:26:08 07:26:08 1 of 6 START sql table model public.my_first_dbt_model ......................... [RUN] 07:26:09 1 of 6 OK created sql table model public.my_first_dbt_model .................... [SUCCESS in 1.16s] 07:26:09 2 of 6 START test not_null_my_first_dbt_model_id ............................... [RUN] 07:26:09 3 of 6 START test unique_my_first_dbt_model_id ................................. [RUN] 07:26:10 2 of 6 PASS not_null_my_first_dbt_model_id ..................................... [PASS in 0.32s] 07:26:10 3 of 6 PASS unique_my_first_dbt_model_id ....................................... [PASS in 0.32s] 07:26:10 4 of 6 START sql view model public.my_second_dbt_model ......................... [RUN] 07:26:10 4 of 6 OK created sql view model public.my_second_dbt_model .................... [SUCCESS in 0.77s] 07:26:11 5 of 6 START test not_null_my_second_dbt_model_id .............................. [RUN] 07:26:11 6 of 6 START test unique_my_second_dbt_model_id ................................ [RUN] 07:26:11 5 of 6 PASS not_null_my_second_dbt_model_id .................................... [PASS in 0.27s] 07:26:11 6 of 6 PASS unique_my_second_dbt_model_id ...................................... [PASS in 0.28s] 07:26:11 07:26:11 Finished running 1 table model, 4 data tests, 1 view model, 1 project hook in 0 hours 0 minutes and 4.83 seconds (4.83s). 07:26:11 07:26:11 Completed successfully 07:26:11 07:26:11 Done. PASS=6 WARN=0 ERROR=0 SKIP=0 TOTAL=6
ちなみに、 varsを与えて、COPY JOBを一時停止することもできます。
$ dbt build --vars '{"s3_auto_copy_is_auto":False}' 07:30:31 Running with dbt=1.8.9 07:30:31 Registered adapter: redshift=1.8.1 07:30:31 Unable to do partial parsing because config vars, config profile, or config target have changed 07:30:32 Found 2 models, 1 operation, 4 data tests, 1 source, 496 macros 07:30:32 07:30:33 07:30:33 Running 1 on-run-start hook 07:30:33 ====== manage s3 copy jobs ============ 07:30:33 1 of 1 START public.peopoles from s3://test.bucket.test/s3_auto_copy_test/ 07:30:34 1 of 1 OK public.peopoles from s3://test.bucket.test/s3_auto_copy_test/[ALTER COPY JOB SUCCESS in dbt__public__peopoles] 07:30:34 ======================================= 07:30:34 1 of 1 START hook: s3_auto_copy.on-run-start.0 ................................. [RUN] 07:30:34 1 of 1 OK hook: s3_auto_copy.on-run-start.0 .................................... [OK in 0.00s] 07:30:34 07:30:34 Concurrency: 15 threads (target='dev') 07:30:34 07:30:34 1 of 6 START sql table model public.my_first_dbt_model ......................... [RUN] 07:30:35 1 of 6 OK created sql table model public.my_first_dbt_model .................... [SUCCESS in 1.35s] 07:30:35 2 of 6 START test not_null_my_first_dbt_model_id ............................... [RUN] 07:30:35 3 of 6 START test unique_my_first_dbt_model_id ................................. [RUN] 07:30:35 3 of 6 PASS unique_my_first_dbt_model_id ....................................... [PASS in 0.30s] 07:30:35 2 of 6 PASS not_null_my_first_dbt_model_id ..................................... [PASS in 0.30s] 07:30:35 4 of 6 START sql view model public.my_second_dbt_model ......................... [RUN] 07:30:36 4 of 6 OK created sql view model public.my_second_dbt_model .................... [SUCCESS in 0.67s] 07:30:36 5 of 6 START test not_null_my_second_dbt_model_id .............................. [RUN] 07:30:36 6 of 6 START test unique_my_second_dbt_model_id ................................ [RUN] 07:30:36 5 of 6 PASS not_null_my_second_dbt_model_id .................................... [PASS in 0.25s] 07:30:36 6 of 6 PASS unique_my_second_dbt_model_id ...................................... [PASS in 0.26s] 07:30:36 07:30:36 Finished running 1 table model, 4 data tests, 1 view model, 1 project hook in 0 hours 0 minutes and 4.50 seconds (4.50s). 07:30:36 07:30:36 Completed successfully 07:30:36 07:30:36 Done. PASS=6 WARN=0 ERROR=0 SKIP=0 TOTAL=6
最近は、データエンジニアリング周りでdbtを活用していくのが浸透してきたように感じます。 Redshiftだけにとどまらず、BigQueryやSnowflakeといったDWH全般において、SQLのインタフェースからできることが格段に増えています。 今回のように、新機能として追加されたばかりのものでも、dbtではmacroをかければさっとdbtの管理下に乗せることができます。 この記事をきっかけに、Redshiftやdbtにより一層興味を持っていただければ幸いです。
*1:内容としては、dbt macro tips Advent Calendar 2022のday 8 あたりが参考になるかも