Redshift Federated Query for RDS/Aurora MySQL をつかったType-2 Slowly Changing Dimensionの実装

こんにちは。技術部の自称データエンジニアの池田です。

Redshift Federated Query for RDS/Aurora MySQL(Federated Query for MySQL)がめでたくGAになりました。 Federated Query for MySQLを使うと、RedshiftからAurora MySQLにクエリを発行し、その結果をRedshift上で利用することができます。 今回は、この機能を使ったType-2 Slowly Changing Dimension(SCD2) の実装の話をします。

aws.amazon.com

TL;DR

  • Change Data Capture(CDC)を実装・運用するほどじゃないけど、State Sourcingなテーブルの変更履歴を追跡したいときには、SCD2を使うと嬉しいです。
  • Federated Query for MySQL機能が非常に便利です。簡単に設定の仕方を説明しました。
  • 対象のソースデータベースがMySQL互換で、Redshift上に変更履歴を構成する場合はSCD2とFederated Query for MySQLを活用するとお手軽です

背景

プロダクトやサービスで使用するデータベースの多くはアプリケーションの都合上、StateSourcingなデータマネジメントを採用したテーブルが主体となることが多いでしょう。
StateSourcingとは、起こった事象によって変化させた状態を保存するやり方です。例えば、ユーザーの名前やプロフィール画像などを変更した場合は、基本的にテーブルのデータを上書きし、最新のものだけデータベースに保存します。
しかし、StateSourcingなテーブルはあとから変更前のデータを振り返ることがとても困難です。カスタマーサポートやデータ分析観点ではデータがどのように変更されていったのか?変更履歴の追跡が非常に重要になってきます。
Amazon Relational Database Service(RDS)を使用している場合、変更履歴を追跡する一番簡単な方法は毎日1回S3へスナップショットをExportすることです 毎日1回S3へのスナップショットをExportで変更履歴を追跡する場合の悩みごととして以下があります。

  • データの変更頻度が1日よりも高い場合は、1日1回のスナップショットでは細かい変化が追跡できない。
  • データの変更頻度がとても低い場合、変化のないデータも毎日出力・保存されるので容量の無駄がある
  • データのどの部分が変更されたのか?という情報を追跡するのが、それなりに大変である。

そこで、これらの悩みが気になる場合は、Change Data Capture(CDC)と呼ばれるプロセスと技術を用いることになります。 例えば、Aurora MySQLでCDCを実現する場合は、Binlogを有効にし、Amazon Database Migration Service(DMS)等を使用して、行の変更履歴をS3へ貯めることになるでしょう。 ところが、実際のCDCの実装・運用では以下のような悩みに出会います。

  • CDCは構築・運用が大変
    • DMSを使えば簡単そうだが、DMSがよくわからない
    • DMSを使わない場合は、Binlog等を処理する必要がある。
  • パフォーマンス上の問題でAurora MySQLのBinlogを有効にしたくない
  • etc...

一般的にCDCの構築・運用は高コストと言われており、気軽に導入は難しいでしょう。 そんなときに、選択肢の一つに入るのが Type-2 Slowly Changing Dimension(SCD2) による変更履歴の構築です。

Type-2 Slowly Changing Dimension(SCD2) とは?

SCD2はData Warehouseのデータマネジメント手法の1つで、ソースデータベースのデータを確認した際に、自然キーや代理キーごとにデータの変更を確認し変化があったら行を追加するというものです。

https://en.wikipedia.org/wiki/Slowly_changing_dimension#Type_2:_add_new_row

内容は単純ですが、この手法で変更履歴を構築することはとても強力です。 SCD2の実装は、データベースの操作的にコストがかかることが多いので、(日次よりは頻繁だが)それほど高頻度に変化しないデータの変更履歴追跡に向いています。 ミリ秒単位でデータが変化し続けるものや、絶対に変更前をロストしたくない場合はCDCを選択することをおすすめします。

Redshift Federated Query for RDS/Aurora MySQL を使ったSCD2の実装

準備

Redshiftを採用しており、アプリケーションが書き込むデータベースにMySQL互換データベースを使用している場合には、この度めでたくGAになったRedshift Federated Query for RDS/Aurora MySQLがとても便利です。 利用に際しては、以下の準備が必要です。

  • RedshiftからMySQLへのネットワーク接続ができる状態 (Security GroupやVPCの設定で疎通できる状態)
  • MySQL上にRedshiftが接続に使うユーザーが存在し、システムテーブルと読みとり対象のテーブルにSELECTできる状態 (面倒であれば、 GRANT SELECT ON *.* TO 'redshift'@'%' のように与える)
  • AWS Secrets ManagerにMySQLへの接続情報を格納し、その情報にアクセスできるIAMロールをRedshiftクラスターにアタッチしておく

この準備ができましたら、Redshift上で以下のような外部スキーマを定義します。

CREATE EXTERNAL SCHEMA federated_for_mysql
FROM MYSQL
DATABASE 'main'
URI 'app.cluster-ro-xxxxxxxxxxxx.ap-northeast-1.rds.amazonaws.com'
IAM_ROLE 'arn:aws:iam::000000000000:role/redshift-federated'
SECRET_ARN 'arn:aws:secretsmanager:ap-northeast-1:000000000000:secret:prod/rds/redshift-federated-XXXXXX';

ここで指定するIAMロールは、SECRET_ARNで指定するSecrets ManagerにおいてあるMySQLへの接続情報にアクセスできる必要があります。

基本的な使い方

仮に、MySQL側に以下のようなテーブルが有るとします。

CREATE TABLE `item` (
    `id` BIGINT UNSIGNED NOT NULL,
    `name` VARCHAR(191) NOT NULL,
    `updated_at` TIMESTAMP NOT NULL,
    `created_at` TIMESTAMP NOT NULL,
    PRIMARY KEY (`id`),
    INDEX `updated_at` (`updated_at`)
) ENGINE=InnoDB DEFAULT CHARACTER SET utf8mb4;

外部スキーマを定義により、このテーブルはRedshift上から以下のSQLで参照できます。

SELECT *
FROM federated_for_mysql.item
WHERE updated_at >= convert_timezone('JST', getdate()) - interval '1 day';

このクエリのExplainを確認すると、以下のような内容になります。

XN MySQL Query Scan competition  (cost=0.00..341.04 rows=***** width=*****)
  ->  Remote MySQL Seq Scan federated_for_mysql.item  (cost=0.00..269.24 rows=****** width=******)
        Filter: (updated_at >= '2021-09-29 16:30:20'::timestamp without time zone)

どうやら、Remote MySQL Seq Scanのステップでupdated_atのFilterがかかるようですね。 このクエリは、MySQL側のIndexの恩恵を受けられそうです。

SCD2の実装

先程のitemテーブル変更履歴をRedshiftに構築してみましょう。 まず、保存先のテーブルを用意します。

CREATE TABLE item_history (
    id BIGINT,
    name VARCHAR(191),
    updated_at TIMESTAMP,
    created_at TIMESTAMP,
    scd2_id char(64)
)
DISTKEY(scd2_id)
SORTKEY(scd2_id);

以降、Redshift上のitem_historyテーブルに変更履歴をためていくためには以下のクエリを実行します。

CREATE TEMP TABLE updated_item
AS
SELECT *, UPPER(SHA2(id || ';' || updated_at, 256))::char(64) as scd2_id
FROM federated_for_mysql.item
WHERE updated_at >= convert_timezone('JST', getdate()) - interval '1 day';

DELETE FROM updated_item
USING item_history
WHERE updated_item.scd2_id = item_history.scd2_id;

INSERT INTO item_history
SELECT * FROM updated_item;

このようにすることで、SQLを実行したタイミングで過去1日以内の変更を取得して、まだitem_historyに入っていない変更だけを入れることができます。 上記のSQLをお好きな間隔で定期的に実行することで、その間隔ごとの変更を検出・記録した変更履歴が構築できます。

DBT snapshot

実際にSCD2を実装・運用する場合だと、ソースデータベースのスキーマ変更に追従する等の作業が必要になります。 よりお手軽にしたい場合はDBTというツールのsnapshotという機能を使うことをおすすめします。

docs.getdbt.com

DBTというツールは、SQLを使ったデータ変換を実現するためのSQLビルダー/ランナーです。 細かいDBTのセットアップ等は省きますが、以下のようなsqlファイルを作成すれば、面倒なマイグレーション等はDBTに任せることができます。

{% snapshot item_history %}

{{
    config(
      unique_key='id',
      strategy='timestamp',
      updated_at='updated_at',
      invalidate_hard_deletes=False,
    )
}}

SELECT * 
FROM federated_for_mysql.item
WHERE updated_at >= convert_timezone('JST', getdate()) - interval '1 day'

{% endsnapshot %}

興味があれば、導入を検討してみてください。

おわりに

この記事では、 Redshift Federated Query for RDS/Aurora MySQL の素晴らしさを共有するために、この機能を使ったType-2 Slowly Changing Dimension の実装例について話しました。 この機能はSCD2以外にも役に立つことが沢山あり、とても素晴らしい機能だと感じました。 例えば、運用中のプロダクトあるあるなアンチパターン JSONがTextで入ったカラム も、SUPER型を併用することでデータ分析上では格段に取り扱いやすくなります。 Redshiftを用いており、アプリケーションが書き込むDBがMySQLの場合は試してみることをおすすめします。

カヤックではデータパイプラインの整備に興味のあるエンジニアも募集しています

中途採用も募集しています