dbtでRedshiftのCOPY JOB (S3 auto copy) を管理したい。

この記事はdbt Advent Calendar 2024および、Kayac Group Advent Calender 2024の4日目の記事になります。
こんにちは、その他事業部SREチーム所属の@mashiikeです。

背景

2024/10/30にAWS からAmazon Redshiftの素晴らしいUpdateが発表されました。 aws.amazon.com

自動コピーを使用して Amazon S3 から Amazon Redshift へのデータ取り込みです。
Redshift では、COPY コマンドを使用してデータを Amazon S3 から 効率的にデータをロードできますが、 このUpdateが来るまでは継続的な取り込みは Amazon Data Firehose を使用する方法が一般的でした。

しかし、カヤックでは「Firehoseの取り込みはメンテナンス時に一時停止できない」、「約10年前から運用しているシステムでは今更切り替えるメリットが薄い」等の理由で、
FirehoseがRedshiftに対応する前から存在する、fujiwara-wareの一つである Rinが続投するケースが多くありました。

github.com

この自動コピーという機能Updateでは、自動取り込みを一時的に止めるという手段も用意されているので、 ようやくRinが引退する時期がやってきたと思っています。
(この機能に関する詳しい内容は、冒頭のAWSのブログを参照していただければ幸いです。 )

データ取り込みをコードとして管理したい。

さて、そんな背景のもとカヤックでは長年の運用で、「データ取り込みの設定」をコードで管理することが定着していました。

例えば、Rinの場合は以下のような設定ファイルで管理していました。

queue_name: my_queue_name    # SQS queue name

credentials:
  aws_region: ap-northeast-1

redshift:
  host: default.123456789012.ap-northeast-1.redshift-serverless.amazonaws.com
  port: 5439
  dbname: dev 
  user: rin
  password: '{{ must_env "REDSHIFT_PASSWORD" }}'
  schema: public
  reconnect_on_error: true

s3:
  bucket: test.bucket.test
  region: ap-northeast-1

sql_option: "JSON 'auto'"

targets:
  - s3:
      key_prefix: s3_auto_copy_test/ 
  - redshift:
      table: peoples

Firehoseを用いる場合は、TeraformのようなIaC(Infra as a Code) を使用して管理することが多かったです。
ですので、今回の機能Updateで追加された、S3の自動コピーをする「Copy Job」もコード的に管理したいという気持ちは当然のごとく湧いてくるのです。

dbtで管理する

近年のデータエンジニアリング周りでは、よくデータ変換周りでdbtを使うことがあると思います。

www.getdbt.com

dbtは、データ変換を行うツールで、SQLを書くだけでデータ変換を行うことができます。 このdbtはmacroという形で様々な機能拡張ができ、外部テーブルの管理をするdbt-external-tablesというものも存在します。

そこで、私はこう思いました。
「外部テーブルをdbtで管理できるんだから、Copy Jobも管理できるんじゃないか!」 「dbtのSourcesの設定の一環として管理できれば、いいじゃん!?」

ということで、サクッとPoCのためのマクロを書いてみました。

このマクロの中身の詳細については割愛*1 するとして、使い方のイメージとしては以下のようになります。

models/sources.yml

version: 2

sources:
  - name: s3_sources
    schema: public
    loader: s3_copy_job  # ここをみて対象かどうかを決めている。
    description: 'Load with Redshift S3 event integrations'
    config:
      iam_role_arn: arn:aws:iam::123456789012:role/RedshiftRole
      is_auto: "{{ var('s3_auto_copy_is_auto', 'True') | as_bool }}"
      copy_parameters: >
        FORMAT AS JSON 'auto'
    tables:
      - name: peoples
        description: "This is a test table for s3 auto copy"
        config:
          location: 's3://test.bucket.test/s3_auto_copy_test/'

sourcesの下のloaderというところが s3_copy_job となっているsourcesを対象に、configに格納された情報をつかってCopy Jobを作成します。 今のマクロでは、copy_parametersが変わっても、追従しませんが何かマクロをもう少し改造したら、いい感じになる気がします。

使い方はdbtを使い慣れてる人なら、察していると思いますが、macrosのディレクトリ配下にマクロを配置して、sources.ymlに記述し、dbt_project.ymlのon-run-start hookで呼び出す想定です。

dbt_project.yml

name: 'hoge'
version: '1.0.0'

model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

profile: 'dev'

models:
  hoge:
    +materialized: view

#中略
## これを追記。
on-run-start:
  - "{{ manage_s3_copy_job() }}"

実際に動かしてみると、以下のようになります。

$ dbt build
07:26:06  Running with dbt=1.8.9
07:26:06  Registered adapter: redshift=1.8.1
07:26:06  Found 2 models, 1 operation, 4 data tests, 1 source, 496 macros
07:26:06  
07:26:07  
07:26:07  Running 1 on-run-start hook
07:26:07  ====== manage s3 copy jobs ============
07:26:07  1 of 1 START public.peopoles from s3://test.bucket.test/s3_auto_copy_test/
07:26:08  1 of 1 OK public.peopoles from s3://test.bucket.test/s3_auto_copy_test/[CREATE COPY JOB SUCCESS in dbt__public__peopoles]
07:26:08  =======================================
07:26:08  1 of 1 START hook: s3_auto_copy.on-run-start.0 ................................. [RUN]
07:26:08  1 of 1 OK hook: s3_auto_copy.on-run-start.0 .................................... [OK in 0.00s]
07:26:08  
07:26:08  Concurrency: 15 threads (target='dev')
07:26:08  
07:26:08  1 of 6 START sql table model public.my_first_dbt_model ......................... [RUN]
07:26:09  1 of 6 OK created sql table model public.my_first_dbt_model .................... [SUCCESS in 1.16s]
07:26:09  2 of 6 START test not_null_my_first_dbt_model_id ............................... [RUN]
07:26:09  3 of 6 START test unique_my_first_dbt_model_id ................................. [RUN]
07:26:10  2 of 6 PASS not_null_my_first_dbt_model_id ..................................... [PASS in 0.32s]
07:26:10  3 of 6 PASS unique_my_first_dbt_model_id ....................................... [PASS in 0.32s]
07:26:10  4 of 6 START sql view model public.my_second_dbt_model ......................... [RUN]
07:26:10  4 of 6 OK created sql view model public.my_second_dbt_model .................... [SUCCESS in 0.77s]
07:26:11  5 of 6 START test not_null_my_second_dbt_model_id .............................. [RUN]
07:26:11  6 of 6 START test unique_my_second_dbt_model_id ................................ [RUN]
07:26:11  5 of 6 PASS not_null_my_second_dbt_model_id .................................... [PASS in 0.27s]
07:26:11  6 of 6 PASS unique_my_second_dbt_model_id ...................................... [PASS in 0.28s]
07:26:11  
07:26:11  Finished running 1 table model, 4 data tests, 1 view model, 1 project hook in 0 hours 0 minutes and 4.83 seconds (4.83s).
07:26:11  
07:26:11  Completed successfully
07:26:11  
07:26:11  Done. PASS=6 WARN=0 ERROR=0 SKIP=0 TOTAL=6

ちなみに、 varsを与えて、COPY JOBを一時停止することもできます。

$ dbt build --vars '{"s3_auto_copy_is_auto":False}'
07:30:31  Running with dbt=1.8.9
07:30:31  Registered adapter: redshift=1.8.1
07:30:31  Unable to do partial parsing because config vars, config profile, or config target have changed
07:30:32  Found 2 models, 1 operation, 4 data tests, 1 source, 496 macros
07:30:32  
07:30:33  
07:30:33  Running 1 on-run-start hook
07:30:33  ====== manage s3 copy jobs ============
07:30:33  1 of 1 START public.peopoles from s3://test.bucket.test/s3_auto_copy_test/
07:30:34  1 of 1 OK public.peopoles from s3://test.bucket.test/s3_auto_copy_test/[ALTER COPY JOB SUCCESS in dbt__public__peopoles]
07:30:34  =======================================
07:30:34  1 of 1 START hook: s3_auto_copy.on-run-start.0 ................................. [RUN]
07:30:34  1 of 1 OK hook: s3_auto_copy.on-run-start.0 .................................... [OK in 0.00s]
07:30:34  
07:30:34  Concurrency: 15 threads (target='dev')
07:30:34  
07:30:34  1 of 6 START sql table model public.my_first_dbt_model ......................... [RUN]
07:30:35  1 of 6 OK created sql table model public.my_first_dbt_model .................... [SUCCESS in 1.35s]
07:30:35  2 of 6 START test not_null_my_first_dbt_model_id ............................... [RUN]
07:30:35  3 of 6 START test unique_my_first_dbt_model_id ................................. [RUN]
07:30:35  3 of 6 PASS unique_my_first_dbt_model_id ....................................... [PASS in 0.30s]
07:30:35  2 of 6 PASS not_null_my_first_dbt_model_id ..................................... [PASS in 0.30s]
07:30:35  4 of 6 START sql view model public.my_second_dbt_model ......................... [RUN]
07:30:36  4 of 6 OK created sql view model public.my_second_dbt_model .................... [SUCCESS in 0.67s]
07:30:36  5 of 6 START test not_null_my_second_dbt_model_id .............................. [RUN]
07:30:36  6 of 6 START test unique_my_second_dbt_model_id ................................ [RUN]
07:30:36  5 of 6 PASS not_null_my_second_dbt_model_id .................................... [PASS in 0.25s]
07:30:36  6 of 6 PASS unique_my_second_dbt_model_id ...................................... [PASS in 0.26s]
07:30:36  
07:30:36  Finished running 1 table model, 4 data tests, 1 view model, 1 project hook in 0 hours 0 minutes and 4.50 seconds (4.50s).
07:30:36  
07:30:36  Completed successfully
07:30:36  
07:30:36  Done. PASS=6 WARN=0 ERROR=0 SKIP=0 TOTAL=6

PoCのために書いたマクロですので、まだ実戦投入してませんが、なにかのたたき台になれば幸いです。

まとめ

最近は、データエンジニアリング周りでdbtを活用していくのが浸透してきたように感じます。 Redshiftだけにとどまらず、BigQueryやSnowflakeといったDWH全般において、SQLのインタフェースからできることが格段に増えています。 今回のように、新機能として追加されたばかりのものでも、dbtではmacroをかければさっとdbtの管理下に乗せることができます。 この記事をきっかけに、Redshiftやdbtにより一層興味を持っていただければ幸いです。

カヤックではちょっぱやで実装できるエンジニアも募集しています!

hubspot.kayac.com

*1:内容としては、dbt macro tips Advent Calendar 2022のday 8 あたりが参考になるかも