SREチームの池田です。
今回はAmazon RedshiftのMERGE SQL commandがGAになりましたので、MERGE SQL commandの何が嬉しいのかを話をしたいと思います。
SRE連載 4月号になります。
3行でまとめ
- RedshiftのMERGE SQL commandがGAになりました。
- Bulk UpsertをSQL1文で実行できるものです。 以前と比べるとスッキリします。
- 複数のデータソースから算出されるレポートの更新に使うと嬉しい。
以前のRedshiftにおけるBulk Upsertについて
ご存知かもしれませんが、『なかったらInsert、あったらUpdate』を通称Upsertといいます。
Redshiftにおける、Upsertのやり方ですがMERGE SQL commandが出る前のRedshiftでは以下のドキュメントにやり方が書いてあります。
既存のテーブルに新しいデータを追加するとき、更新と挿入を組み合わせてステージングテーブルから行うと効率的です。Amazon Redshift は merge または upsert (1 つのデータソースからテーブルを更新するコマンド) をサポートしていないものの、ステージングテーブルを作成し、このセクションに示した方法のいずれかを使ってステージングテーブルからテーブルを更新すれば、マージオペレーションを実現することができます。
例えば、以下のSQLで疑似データを作成します。
この疑似データは、あるWEBサービスのアクセスログがaccess_logs
というテーブルに格納されることを想定しています。
そして、そのアクセスログから、 hourly_active_users
という毎時あたりのアクセスuuの指標を算出するという状況を想定しています。
CREATE TABLE access_logs ( access_at timestamp, user_id bigint, method varchar(6), uri varchar(max) ) DISTSTYLE AUTO SORTKEY AUTO; INSERT INTO access_logs (access_at, user_id, method, uri) VALUES ('2023-04-25 15:30:33', 1234, 'GET', 'http://example.com/'), ('2023-04-25 15:23:34', 5678, 'GET', 'http://example.com/'), ('2023-04-25 15:31:14', 9012, 'POST', 'http://example.com/hoge'), ('2023-04-25 15:57:24', 1234, 'POST', 'http://example.com/hoge'), ('2023-04-25 15:40:52', 9012, 'GET', 'http://example.com/'), ('2023-04-25 16:21:42', 5678, 'POST', 'http://example.com/hoge'), ('2023-04-25 17:11:34', 9012, 'GET', 'http://example.com/'), ('2023-04-25 18:15:37', 5678, 'GET', 'http://example.com/'), ('2023-04-25 19:28:42', 1234, 'GET', 'http://example.com/'); CREATE TABLE hourly_active_users DISTSTYLE AUTO SORTKEY AUTO AS SELECT date_trunc('hour', access_at) as access_hour ,count(distinct user_id) as uu FROM access_logs GROUP BY 1;
上記のSQLの結果、hourly_active_users
テーブルの内容は以下のようになります。
dev=> SELECT * FROM hourly_active_users; access_hour | uu ---------------------+---- 2023-04-25 15:00:00 | 3 2023-04-25 16:00:00 | 1 2023-04-25 17:00:00 | 1 2023-04-25 18:00:00 | 1 2023-04-25 19:00:00 | 1 (5 rows)
ここで、Upsertの説明の準備のために、データを追記しておきます。
新しく、access_logs
にデータが追記することで、hourly_active_users
の更新が必要になると思います。
下記の想定は 2023-04-25 16:00:00
以降のログが遅延して到着するようなことを考えています。
-- データの追記 INSERT INTO access_logs (access_at, user_id, method, uri) VALUES ('2023-04-25 16:12:52', 9821, 'GET', 'http://example.com/'), ('2023-04-25 16:23:42', 3456, 'POST', 'http://example.com/hoge'), ('2023-04-25 17:11:34', 1212, 'GET', 'http://example.com/'), ('2023-04-25 17:23:37', 5555, 'GET', 'http://example.com/'), ('2023-04-25 17:19:42', 1234, 'GET', 'http://example.com/'), ('2023-04-25 18:52:42', 3223, 'GET', 'http://example.com/'), ('2023-04-25 19:19:42', 1234, 'GET', 'http://example.com/'), ('2023-04-25 20:31:42', 5423, 'GET', 'http://example.com/'), ('2023-04-25 21:31:42', 3232, 'GET', 'http://example.com/'); -- ここまでを①とする。
ここで、データを追記してMERGE SQL commandを使わずに、Upsertを実現してみます。
-- Staging テーブルの作成 CREATE TEMP TABLE temp_new_hourly_active_users AS SELECT date_trunc('hour', access_at) as access_hour ,count(distinct user_id) as uu FROM access_logs GROUP BY 1; BEGIN; -- 既存の行の削除 DELETE FROM hourly_active_users USING temp_new_hourly_active_users WHERE hourly_active_users.access_hour = temp_new_hourly_active_users.access_hour; -- 新しい行の追加 INSERT INTO hourly_active_users SELECT * FROM temp_new_hourly_active_users; COMMIT;
更新処理後のテーブルは、次のようになります。
dev=> SELECT * FROM hourly_active_users; access_hour | uu ---------------------+---- 2023-04-25 15:00:00 | 3 2023-04-25 16:00:00 | 3 2023-04-25 17:00:00 | 4 2023-04-25 18:00:00 | 2 2023-04-25 19:00:00 | 1 2023-04-25 20:00:00 | 1 2023-04-25 21:00:00 | 1 (7 rows)
2023-04-25 16:00:00
以降がデータとして追記されたので、2023-04-25 15:00:00
の uuは変わっておらず
2023-04-25 16:00:00
~ 2023-04-25 19:00:00
までは更新され、 2023-04-25 20:00:00
以降は追記されました。
Upsertの処理が実現できていることがわかります。
MERGE SQL commandを使う場合
先日GAとなったMERGE SQL commandを使った例を見てみましょう。
単純にUpsertするケース
テーブルの状態としては、追記されたあと(①の時点)に戻しています。
dev=> SELECT count(*) FROM access_logs; count ------- 18 (1 row) dev=> SELECT count(*) FROM hourly_active_users; count ------- 5 (1 row)
この状態で以下のMERGE SQL command を実行します。
-- Staging テーブルの作成 CREATE TEMP TABLE temp_new_hourly_active_users AS SELECT date_trunc('hour', access_at) as access_hour ,count(distinct user_id) as uu FROM access_logs GROUP BY 1; MERGE INTO hourly_active_users USING temp_new_hourly_active_users AS src ON hourly_active_users.access_hour = src.access_hour WHEN matched THEN UPDATE SET uu = src.uu WHEN NOT matched THEN INSERT VALUES (src.access_hour, src.uu);
この結果は、前述のUpsertの結果と一致します。
dev=> SELECT * FROM hourly_active_users; access_hour | uu ---------------------+---- 2023-04-25 15:00:00 | 3 2023-04-25 16:00:00 | 3 2023-04-25 17:00:00 | 4 2023-04-25 18:00:00 | 2 2023-04-25 19:00:00 | 1 2023-04-25 20:00:00 | 1 2023-04-25 21:00:00 | 1 (7 rows)
比較するとだいぶスッキリとかけますね。(トランザクションもはっていませんし。)
Nothing or Insertするケース
Upsertする方法はわかりました。 ところで、世の中には一度出力したレポートは更新したくないケースがあります。 それも、MERGE SQL commandで簡単に実行できます。
テーブルの状態をMERGE前(①の時点)に戻します。
状況の想定を思い出すと、この時点での hourly_active_users
の中身は以下のようになっています。
dev=> SELECT * FROM hourly_active_users; access_hour | uu ---------------------+---- 2023-04-25 15:00:00 | 3 2023-04-25 16:00:00 | 1 2023-04-25 17:00:00 | 1 2023-04-25 18:00:00 | 1 2023-04-25 19:00:00 | 1 (5 rows)
tempテーブルは同様に作成したとして、次のようにMERGE SQL commandを変更します。
MERGE INTO hourly_active_users USING temp_new_hourly_active_users AS src ON hourly_active_users.access_hour = src.access_hour WHEN matched THEN UPDATE SET uu = hourly_active_users.uu WHEN NOT matched THEN INSERT VALUES (src.access_hour, src.uu);
UPDATE SET
の後を src.uu
から hourly_active_users.uu
に変更しました。
dev=> SELECT * FROM hourly_active_users; access_hour | uu ---------------------+---- 2023-04-25 15:00:00 | 3 2023-04-25 16:00:00 | 1 2023-04-25 17:00:00 | 1 2023-04-25 18:00:00 | 1 2023-04-25 19:00:00 | 1 2023-04-25 20:00:00 | 1 2023-04-25 21:00:00 | 1 (7 rows)
Upsertのときとは異なり、2023-04-25 16:00:00
~ 2023-04-25 19:00:00
まではそのままで、 2023-04-25 20:00:00
以降は追記されました。
MySQLで言うところの INSERT INTO ON DUPLICATE KEY DO NOTHING
に近い結果ですね。
何が嬉しいのか?
もちろん、表現がスッキリするとか、1つのSQL commandだけで完結することとかも嬉しいですが、もっと嬉しいことがあります。
それを一言でいうなら、『MERGE SQL commandをよく見ると、UPDATEとINSERTのカラムは任意に選択できること』が嬉しいのです。
この嬉しさを説明するために、hourly_active_users
を集計する例とは別の例を考えます。
access_logs
というデータがあるのは同様として、以下のような購入ログpurchase_logs
というテーブルが追加であるとします。
CREATE TABLE purchase_logs ( purchase_at timestamp, user_id bigint, item_id bigint, quantity integer, item_unit_price integer ) DISTSTYLE AUTO SORTKEY AUTO; INSERT INTO purchase_logs (purchase_at, user_id, item_id, quantity, item_unit_price) VALUES ('2023-04-25 13:30:33', 1234, 532, 1, 1000), ('2023-04-25 15:23:34', 5678, 111, 2, 500), ('2023-04-25 16:31:14', 9012, 222, 1, 200), ('2023-04-25 19:57:24', 1234, 111, 1, 500), ('2023-04-26 00:23:24', 1234, 333, 1, 600), ('2023-04-26 11:23:24', 9012, 333, 3, 600);
そして、access_logs
から日次のアクセスユーザー数(DAU)としてactive_users
、
purchase_logs
から日次の購入数purchase_count
を集計して、以下のようなreportsテーブルに1日1行としてまとめるという例を考えます。
ymd
がレポートの集計対象の日付、created_at
がレポートの最初の作成日時、 updated_at
がレポートの更新日時とします。
CREATE TABLE reports ( ymd date, active_users integer, purchase_count integer, created_at timestamp, updated_at timestamp ) DISTSTYLE AUTO SORTKEY AUTO;
このような1日毎に主要なKPIをまとめたレポートテーブルは、ありがちだと思います。
こういうテーブルの更新は、実は意外に面倒です。
よくある戦略としては、全部のデータをまとめて集計するという方法ではないでしょうか?
この例の場合は access_logs
とpurchase_logs
を集計する以下のような集計クエリを書くと思います。
-- すべてのデータの到着を確認したら、以下のクエリを実行する。 CREATE TEMP TABLE temp_new_daily_reports AS WITH access_logs_metrics AS ( SELECT date_trunc('day', access_at)::date as ymd ,count(distinct user_id) as active_users FROM access_logs GROUP BY 1 ), purchase_logs_metrics AS ( SELECT date_trunc('day', purchase_at)::date as ymd ,count(*) as purchase_count FROM purchase_logs GROUP BY 1 ), base as ( SELECT ymd FROM access_logs_metrics UNION SELECT ymd FROM purchase_logs_metrics ) SELECT base.ymd, access_logs_metrics.active_users, purchase_logs_metrics.purchase_count, getdate() as created_at, getdate() as updated_at FROM base LEFT JOIN access_logs_metrics ON base.ymd = access_logs_metrics.ymd LEFT JOIN purchase_logs_metrics ON base.ymd = purchase_logs_metrics.ymd; -- reports テーブルをUpsertする。 BEGIN; DELETE FROM reports USING temp_new_daily_reports WHERE reports.ymd = temp_new_daily_reports.ymd; INSERT INTO reports SELECT * FROM temp_new_daily_reports; COMMIT;
しかし、その戦略で実際に運用してみると、いろいろなことがあります。
例えば、どこかのパイプラインでエラーが発生したりして、データの到着が遅れたりすることを考えます。
すべてのデータが確実に到着してから更新すること場合は、その日のレポートが1日分発生せずデータの鮮度のSLO違反になります。
かといって、歯抜けを許容して部分的にレポートを生成して更新するとなると、リトライ等でまた別の面倒なことが起きたりします。
こういう場合、パイプラインを分割して、データソースごとに部分的にレポートを更新するように作っておくと、運用上は楽になります。
今までは、このパイプラインの分割は結構手間でしたが、MERGE SQL commandを使用することでだいぶ楽になります。 具体的には、以下のような更新ができます。
-- access_logs用の集計パイプライン CREATE TEMP TABLE temp_new_daily_active_users AS SELECT date_trunc('day', access_at)::date as ymd ,count(distinct user_id) as active_users FROM access_logs GROUP BY 1; MERGE INTO reports USING temp_new_daily_active_users AS src ON reports.ymd = src.ymd WHEN matched THEN UPDATE SET active_users = src.active_users, updated_at = getdate() WHEN NOT matched THEN INSERT (ymd, active_users, created_at, updated_at) VALUES (src.ymd, src.active_users, getdate(), getdate());
このパイプラインだけ実行して、日次のactive_users
の集計結果をreports
テーブルに対してMERGE SQL commandを用いて、部分的にUpsertをできます。
この実行結果が以下になります。
dev=> select * from reports; ymd | active_users | purchase_count | created_at | updated_at ------------+--------------+----------------+---------------------+--------------------- 2023-04-25 | 10 | | 2023-04-25 10:01:43 | 2023-04-25 10:01:43 (1 row)
この時点では、purchase_count
は NULLでレポートが生成されます。
別のパイプラインで今度は日次のpurchase_count
を別途集計して更新します。
-- purchase_logs用の集計パイプライン CREATE TEMP TABLE temp_new_daily_purchase_count AS SELECT date_trunc('day', purchase_at)::date as ymd ,count(*) as purchase_count FROM purchase_logs GROUP BY 1; MERGE INTO reports USING temp_new_daily_purchase_count AS src ON reports.ymd = src.ymd WHEN matched THEN UPDATE SET purchase_count = src.purchase_count, updated_at = getdate() WHEN NOT matched THEN INSERT (ymd, purchase_count, created_at, updated_at) VALUES (src.ymd, src.purchase_count, getdate(), getdate());
この時点での、reportsテーブルは次のようになります。
dev=> select * from reports; ymd | active_users | purchase_count | created_at | updated_at ------------+--------------+----------------+---------------------+--------------------- 2023-04-25 | 10 | 4 | 2023-04-25 10:01:43 | 2023-04-25 10:17:02 2023-04-26 | | 2 | 2023-04-25 10:16:58 | 2023-04-25 10:16:58 (2 rows)
もちろん、パイプラインの実行順序が逆でも同様の結果になります。
active_users
の集計パイプラインとpurchase_count
の集計パイプラインを分割しても、容易にレポートの更新ができるのです。
access_logs
のパイプラインにエラーが発生して、データの到着が遅れたりしても、purchase_count
の値だけは正常に出力できます。
遅れてデータが到着したときには、access_logs
のパイプラインだけ実行すればよいのです。
全てのデータが到着してから集計するケースと比較すると、
SLI/SLOの観点では、reports
テーブルに対してgraceful degradationを実現したということなります。
MERGE SQL commandがGAになったことで、このような1日ごとのKPIをまとめたような横長のレポートの部分更新が可能になりました。 複数のデータソースを元に作成されるレポートテーブルの更新に関しては、 簡単にパイプラインの分割とレポートテーブルの部分更新ができるようになったのです。 Redshift使いには大変嬉しいことだと個人的には思います。
まとめ
MERGE SQL commandがGAになりました。
このSQL commandはUpsertをより簡単に実現することができるcommandになります。
さらに、このcommandは列ごとの部分的なUpsertをすることができるため、複数のデータソースから算出されるような横長のテーブルの更新でgraceful degradationを容易に実現できるようになりました。
これは、データの信頼性向上に大きく貢献します。