RedshiftのMERGE SQL commandがGAになりましたね。

SREチームの池田です。 今回はAmazon RedshiftのMERGE SQL commandがGAになりましたので、MERGE SQL commandの何が嬉しいのかを話をしたいと思います。
SRE連載 4月号になります。

aws.amazon.com

3行でまとめ

  • RedshiftのMERGE SQL commandがGAになりました。
  • Bulk UpsertをSQL1文で実行できるものです。 以前と比べるとスッキリします。
  • 複数のデータソースから算出されるレポートの更新に使うと嬉しい。

以前のRedshiftにおけるBulk Upsertについて

ご存知かもしれませんが、『なかったらInsert、あったらUpdate』を通称Upsertといいます。

Redshiftにおける、Upsertのやり方ですがMERGE SQL commandが出る前のRedshiftでは以下のドキュメントにやり方が書いてあります。

docs.aws.amazon.com

既存のテーブルに新しいデータを追加するとき、更新と挿入を組み合わせてステージングテーブルから行うと効率的です。Amazon Redshift は merge または upsert (1 つのデータソースからテーブルを更新するコマンド) をサポートしていないものの、ステージングテーブルを作成し、このセクションに示した方法のいずれかを使ってステージングテーブルからテーブルを更新すれば、マージオペレーションを実現することができます。

例えば、以下のSQLで疑似データを作成します。

この疑似データは、あるWEBサービスのアクセスログがaccess_logsというテーブルに格納されることを想定しています。 そして、そのアクセスログから、 hourly_active_users という毎時あたりのアクセスuuの指標を算出するという状況を想定しています。

CREATE TABLE access_logs (
    access_at timestamp,
    user_id bigint,
    method varchar(6),
    uri varchar(max)
)
DISTSTYLE AUTO 
SORTKEY AUTO;

INSERT INTO access_logs (access_at, user_id, method, uri)
VALUES 
    ('2023-04-25 15:30:33', 1234, 'GET', 'http://example.com/'),
    ('2023-04-25 15:23:34', 5678, 'GET', 'http://example.com/'),
    ('2023-04-25 15:31:14', 9012, 'POST', 'http://example.com/hoge'),
    ('2023-04-25 15:57:24', 1234, 'POST', 'http://example.com/hoge'),
    ('2023-04-25 15:40:52', 9012, 'GET', 'http://example.com/'),
    ('2023-04-25 16:21:42', 5678, 'POST', 'http://example.com/hoge'),
    ('2023-04-25 17:11:34', 9012, 'GET', 'http://example.com/'),
    ('2023-04-25 18:15:37', 5678, 'GET', 'http://example.com/'),
    ('2023-04-25 19:28:42', 1234, 'GET', 'http://example.com/');

CREATE TABLE hourly_active_users 
DISTSTYLE AUTO 
SORTKEY AUTO
AS SELECT 
    date_trunc('hour', access_at) as access_hour
    ,count(distinct user_id) as uu
FROM access_logs
GROUP BY 1;

上記のSQLの結果、hourly_active_users テーブルの内容は以下のようになります。

dev=> SELECT * FROM hourly_active_users;
     access_hour     | uu 
---------------------+----
 2023-04-25 15:00:00 |  3
 2023-04-25 16:00:00 |  1
 2023-04-25 17:00:00 |  1
 2023-04-25 18:00:00 |  1
 2023-04-25 19:00:00 |  1
(5 rows) 

ここで、Upsertの説明の準備のために、データを追記しておきます。 新しく、access_logsにデータが追記することで、hourly_active_usersの更新が必要になると思います。

下記の想定は 2023-04-25 16:00:00 以降のログが遅延して到着するようなことを考えています。

-- データの追記
INSERT INTO access_logs (access_at, user_id, method, uri)
VALUES 
    ('2023-04-25 16:12:52', 9821, 'GET', 'http://example.com/'),
    ('2023-04-25 16:23:42', 3456, 'POST', 'http://example.com/hoge'),
    ('2023-04-25 17:11:34', 1212, 'GET', 'http://example.com/'),
    ('2023-04-25 17:23:37', 5555, 'GET', 'http://example.com/'),
    ('2023-04-25 17:19:42', 1234, 'GET', 'http://example.com/'),
    ('2023-04-25 18:52:42', 3223, 'GET', 'http://example.com/'),
    ('2023-04-25 19:19:42', 1234, 'GET', 'http://example.com/'),
    ('2023-04-25 20:31:42', 5423, 'GET', 'http://example.com/'),
    ('2023-04-25 21:31:42', 3232, 'GET', 'http://example.com/');
-- ここまでを①とする。

ここで、データを追記してMERGE SQL commandを使わずに、Upsertを実現してみます。

-- Staging テーブルの作成
CREATE TEMP TABLE temp_new_hourly_active_users
AS SELECT 
    date_trunc('hour', access_at) as access_hour
    ,count(distinct user_id) as uu
FROM access_logs
GROUP BY 1;

BEGIN;
-- 既存の行の削除
DELETE FROM hourly_active_users 
USING temp_new_hourly_active_users
WHERE hourly_active_users.access_hour = temp_new_hourly_active_users.access_hour;

-- 新しい行の追加
INSERT INTO hourly_active_users
SELECT * FROM temp_new_hourly_active_users;

COMMIT;

更新処理後のテーブルは、次のようになります。

dev=> SELECT * FROM hourly_active_users;
     access_hour     | uu 
---------------------+----
 2023-04-25 15:00:00 |  3
 2023-04-25 16:00:00 |  3
 2023-04-25 17:00:00 |  4
 2023-04-25 18:00:00 |  2
 2023-04-25 19:00:00 |  1
 2023-04-25 20:00:00 |  1
 2023-04-25 21:00:00 |  1
(7 rows)

2023-04-25 16:00:00 以降がデータとして追記されたので、2023-04-25 15:00:00 の uuは変わっておらず 2023-04-25 16:00:00 ~ 2023-04-25 19:00:00 までは更新され、 2023-04-25 20:00:00 以降は追記されました。 Upsertの処理が実現できていることがわかります。

MERGE SQL commandを使う場合

先日GAとなったMERGE SQL commandを使った例を見てみましょう。

docs.aws.amazon.com

単純にUpsertするケース

テーブルの状態としては、追記されたあと(①の時点)に戻しています。

dev=> SELECT count(*) FROM access_logs;
 count 
-------
    18
(1 row)

dev=> SELECT count(*) FROM hourly_active_users;
 count 
-------
     5
(1 row)

この状態で以下のMERGE SQL command を実行します。

-- Staging テーブルの作成
CREATE TEMP TABLE temp_new_hourly_active_users
AS SELECT 
    date_trunc('hour', access_at) as access_hour
    ,count(distinct user_id) as uu
FROM access_logs
GROUP BY 1;

MERGE INTO hourly_active_users
USING temp_new_hourly_active_users AS src
ON hourly_active_users.access_hour = src.access_hour
WHEN matched THEN UPDATE SET uu = src.uu
WHEN NOT matched THEN INSERT VALUES (src.access_hour, src.uu); 

この結果は、前述のUpsertの結果と一致します。

dev=> SELECT * FROM hourly_active_users;
     access_hour     | uu 
---------------------+----
 2023-04-25 15:00:00 |  3
 2023-04-25 16:00:00 |  3
 2023-04-25 17:00:00 |  4
 2023-04-25 18:00:00 |  2
 2023-04-25 19:00:00 |  1
 2023-04-25 20:00:00 |  1
 2023-04-25 21:00:00 |  1
(7 rows)

比較するとだいぶスッキリとかけますね。(トランザクションもはっていませんし。)

Nothing or Insertするケース

Upsertする方法はわかりました。 ところで、世の中には一度出力したレポートは更新したくないケースがあります。 それも、MERGE SQL commandで簡単に実行できます。

テーブルの状態をMERGE前(①の時点)に戻します。

状況の想定を思い出すと、この時点での hourly_active_users の中身は以下のようになっています。

dev=> SELECT * FROM hourly_active_users;
     access_hour     | uu 
---------------------+----
 2023-04-25 15:00:00 |  3
 2023-04-25 16:00:00 |  1
 2023-04-25 17:00:00 |  1
 2023-04-25 18:00:00 |  1
 2023-04-25 19:00:00 |  1
(5 rows) 

tempテーブルは同様に作成したとして、次のようにMERGE SQL commandを変更します。

MERGE INTO hourly_active_users
USING temp_new_hourly_active_users AS src
ON hourly_active_users.access_hour = src.access_hour
WHEN matched THEN UPDATE SET uu = hourly_active_users.uu
WHEN NOT matched THEN INSERT VALUES (src.access_hour, src.uu); 

UPDATE SET の後を src.uu から hourly_active_users.uu に変更しました。

dev=> SELECT * FROM hourly_active_users;                                          
     access_hour     | uu 
---------------------+----
 2023-04-25 15:00:00 |  3
 2023-04-25 16:00:00 |  1
 2023-04-25 17:00:00 |  1
 2023-04-25 18:00:00 |  1
 2023-04-25 19:00:00 |  1
 2023-04-25 20:00:00 |  1
 2023-04-25 21:00:00 |  1
(7 rows)

Upsertのときとは異なり、2023-04-25 16:00:00 ~ 2023-04-25 19:00:00 まではそのままで、 2023-04-25 20:00:00 以降は追記されました。

MySQLで言うところの INSERT INTO ON DUPLICATE KEY DO NOTHING に近い結果ですね。

何が嬉しいのか?

もちろん、表現がスッキリするとか、1つのSQL commandだけで完結することとかも嬉しいですが、もっと嬉しいことがあります。

それを一言でいうなら、『MERGE SQL commandをよく見ると、UPDATEとINSERTのカラムは任意に選択できること』が嬉しいのです。

この嬉しさを説明するために、hourly_active_users を集計する例とは別の例を考えます。

access_logsというデータがあるのは同様として、以下のような購入ログpurchase_logsというテーブルが追加であるとします。

CREATE TABLE purchase_logs (
    purchase_at timestamp,
    user_id bigint,
    item_id bigint,
    quantity integer,
    item_unit_price integer
)
DISTSTYLE AUTO 
SORTKEY AUTO;

INSERT INTO purchase_logs (purchase_at, user_id, item_id, quantity, item_unit_price)
VALUES 
    ('2023-04-25 13:30:33', 1234, 532, 1, 1000),
    ('2023-04-25 15:23:34', 5678, 111, 2, 500),
    ('2023-04-25 16:31:14', 9012, 222, 1, 200),
    ('2023-04-25 19:57:24', 1234, 111, 1, 500),
    ('2023-04-26 00:23:24', 1234, 333, 1, 600),
    ('2023-04-26 11:23:24', 9012, 333, 3, 600);

そして、access_logsから日次のアクセスユーザー数(DAU)としてactive_userspurchase_logsから日次の購入数purchase_countを集計して、以下のようなreportsテーブルに1日1行としてまとめるという例を考えます。 ymdがレポートの集計対象の日付、created_atがレポートの最初の作成日時、 updated_atがレポートの更新日時とします。

CREATE TABLE reports (
    ymd date,
    active_users integer,
    purchase_count integer,
    created_at timestamp,
    updated_at timestamp
)
DISTSTYLE AUTO 
SORTKEY AUTO;

このような1日毎に主要なKPIをまとめたレポートテーブルは、ありがちだと思います。

こういうテーブルの更新は、実は意外に面倒です。
よくある戦略としては、全部のデータをまとめて集計するという方法ではないでしょうか?
この例の場合は access_logspurchase_logsを集計する以下のような集計クエリを書くと思います。

-- すべてのデータの到着を確認したら、以下のクエリを実行する。
CREATE TEMP TABLE temp_new_daily_reports
AS WITH access_logs_metrics AS (
    SELECT 
        date_trunc('day', access_at)::date as ymd
        ,count(distinct user_id) as active_users
    FROM access_logs
    GROUP BY 1 
), purchase_logs_metrics AS (
    SELECT 
        date_trunc('day', purchase_at)::date as ymd
        ,count(*) as purchase_count
    FROM purchase_logs
    GROUP BY 1
), base as (
    SELECT ymd FROM access_logs_metrics
    UNION 
    SELECT ymd FROM purchase_logs_metrics 
)
SELECT 
    base.ymd,
    access_logs_metrics.active_users,
    purchase_logs_metrics.purchase_count,
    getdate() as created_at,
    getdate() as updated_at
FROM base 
LEFT JOIN access_logs_metrics ON base.ymd = access_logs_metrics.ymd
LEFT JOIN purchase_logs_metrics ON base.ymd = purchase_logs_metrics.ymd;

-- reports テーブルをUpsertする。
BEGIN;
DELETE FROM reports 
USING temp_new_daily_reports
WHERE reports.ymd = temp_new_daily_reports.ymd;

INSERT INTO reports
SELECT * FROM temp_new_daily_reports;

COMMIT;

しかし、その戦略で実際に運用してみると、いろいろなことがあります。

例えば、どこかのパイプラインでエラーが発生したりして、データの到着が遅れたりすることを考えます。
すべてのデータが確実に到着してから更新すること場合は、その日のレポートが1日分発生せずデータの鮮度のSLO違反になります。 かといって、歯抜けを許容して部分的にレポートを生成して更新するとなると、リトライ等でまた別の面倒なことが起きたりします。

こういう場合、パイプラインを分割して、データソースごとに部分的にレポートを更新するように作っておくと、運用上は楽になります。

今までは、このパイプラインの分割は結構手間でしたが、MERGE SQL commandを使用することでだいぶ楽になります。 具体的には、以下のような更新ができます。

-- access_logs用の集計パイプライン
CREATE TEMP TABLE temp_new_daily_active_users
AS SELECT 
    date_trunc('day', access_at)::date as ymd
    ,count(distinct user_id) as active_users
FROM access_logs
GROUP BY 1;

MERGE INTO reports
USING temp_new_daily_active_users AS src
ON reports.ymd = src.ymd
WHEN matched THEN 
    UPDATE SET active_users = src.active_users, updated_at = getdate()
WHEN NOT matched THEN 
    INSERT (ymd, active_users, created_at, updated_at)
    VALUES (src.ymd, src.active_users, getdate(), getdate());

このパイプラインだけ実行して、日次のactive_usersの集計結果をreportsテーブルに対してMERGE SQL commandを用いて、部分的にUpsertをできます。 この実行結果が以下になります。

dev=> select * from reports;
    ymd     | active_users | purchase_count |     created_at      |     updated_at      
------------+--------------+----------------+---------------------+---------------------
 2023-04-25 |           10 |                | 2023-04-25 10:01:43 | 2023-04-25 10:01:43
(1 row)

この時点では、purchase_count は NULLでレポートが生成されます。 別のパイプラインで今度は日次のpurchase_countを別途集計して更新します。

-- purchase_logs用の集計パイプライン
CREATE TEMP TABLE temp_new_daily_purchase_count
AS SELECT 
    date_trunc('day', purchase_at)::date as ymd
    ,count(*) as purchase_count
FROM purchase_logs
GROUP BY 1;

MERGE INTO reports
USING temp_new_daily_purchase_count AS src
ON reports.ymd = src.ymd
WHEN matched THEN 
    UPDATE SET purchase_count = src.purchase_count, updated_at = getdate()
WHEN NOT matched THEN 
    INSERT (ymd, purchase_count, created_at, updated_at)
    VALUES (src.ymd, src.purchase_count, getdate(), getdate());

この時点での、reportsテーブルは次のようになります。

dev=> select * from reports;
    ymd     | active_users | purchase_count |     created_at      |     updated_at      
------------+--------------+----------------+---------------------+---------------------
 2023-04-25 |           10 |              4 | 2023-04-25 10:01:43 | 2023-04-25 10:17:02
 2023-04-26 |              |              2 | 2023-04-25 10:16:58 | 2023-04-25 10:16:58
(2 rows)

もちろん、パイプラインの実行順序が逆でも同様の結果になります。

active_users の集計パイプラインとpurchase_countの集計パイプラインを分割しても、容易にレポートの更新ができるのです。 access_logs のパイプラインにエラーが発生して、データの到着が遅れたりしても、purchase_countの値だけは正常に出力できます。 遅れてデータが到着したときには、access_logsのパイプラインだけ実行すればよいのです。

全てのデータが到着してから集計するケースと比較すると、 SLI/SLOの観点では、reports テーブルに対してgraceful degradationを実現したということなります。

MERGE SQL commandがGAになったことで、このような1日ごとのKPIをまとめたような横長のレポートの部分更新が可能になりました。 複数のデータソースを元に作成されるレポートテーブルの更新に関しては、 簡単にパイプラインの分割とレポートテーブルの部分更新ができるようになったのです。 Redshift使いには大変嬉しいことだと個人的には思います。

まとめ

MERGE SQL commandがGAになりました。 このSQL commandはUpsertをより簡単に実現することができるcommandになります。
さらに、このcommandは列ごとの部分的なUpsertをすることができるため、複数のデータソースから算出されるような横長のテーブルの更新でgraceful degradationを容易に実現できるようになりました。 これは、データの信頼性向上に大きく貢献します。

カヤックではデータの信頼性を求めるエンジニアを募集しています! hubspot.kayac.com

【後編】YAPC::Kyoto 2023 の紙絵馬を海岸でセルフお焚き上げしました

技術部の小池です。

この記事は 【前編】YAPC::Kyoto 2023 におみくじと紙絵馬のブースを出展しました の後編です。

お焚き上げ

前編の記事では以下のように書いていました。

みなさんの願いがこもった紙絵馬は鎌倉のオフィスにすべて持ち帰っており、神社で祈祷をする準備を進めております。本当は祈祷をしてから記事を書く予定でしたが、少々時間がかかりそうなので前後編の2つに分け後編の記事にて祈祷の様子をお送りしていきます。

残念ながら神社での祈祷は断念しましたが、関係者で相談し海岸で焚き火をしてセルフお焚き上げを行いましたので後編としてお届けします。

お焚き上げは登壇者の macopy を含む有志の社員で逗子海岸*1で行いました。みなさんが書いた紙絵馬、焚き火台、火消し壺、着火グッズ、薪を用意して逗子海岸に向かいます。

薪割りをして火の準備をする様子
薪割りをして火の準備をする様子

無事に火がつきました
無事に火がつきました

火がついたので紙絵馬をひとつずつ読み上げながら投下していきます。みなさんにブースでメッセージを書いていただいた光景が蘇ってきます。4月からエンジニアとしてデビューする方、4月に大規模メンテナンスを控えている方、いろいろな方とお話させていただきました。

運営の papix さんの YAPC 成功祈願
運営の papix さんの YAPC 成功祈願

給料5000倍!
給料5000倍!

みなさんの願いが叶いますように…
みなさんの願いが叶いますように…

69枚あった紙絵馬はすべて火に焚べられ、最後は火の後の炭を火消し壺に入れて焚き火を終了しました。ゴミは海岸に置いていってはいけないのですべて持ち帰ります。夜の逗子海岸は少し寒く、3月のまだ冬の空気だった京都の夜を思い起こしました。

余談ですが YAPC で行ったこの紙絵馬の企画は社内で評判がよく、社内のフリースペース的なエリアで紙絵馬を飾れるようにすることが決まりました。

紙絵馬の紹介

いくつか話題になった紙絵馬を紹介したいと思います。

社長がCIをこわしませんように
社長がCIをこわしませんように

会場でも人気が高かった紙絵馬です。現在 CI の調子はいかがでしょうか?

業界全体が前に進みますように
業界全体が前に進みますように

昨年カヤックと 合同勉強会 を開催した PR TIMES catatsuy さんの紙絵馬です。前に進んでいきたいなあと思いました。

ChatGPTにまけない!
ChatGPTにまけない!

意気込みを感じます。AI に負けない意気込み紙絵馬はほかにもいくつかありました。一方で AI に代わりに稼いでほしいというものもありました。

新卒1年目うまくいきますように
新卒1年目うまくいきますように

新しい環境でのご活躍を願っています!

このほかにも多くの意気込みや願いが綴られていました。ありがとうございました。

登壇の動画が公開されています

YAPC::Kyoto 2023 の 登壇動画 が公開されています。カヤックから登壇した macopykoluku の動画もぜひご覧ください。

デプロイ今昔物語 〜CGIからサーバーレスまで〜 macopy youtu.be

日常業務のカイゼンで図る開発チームへの貢献 koluku youtu.be

最後に

ブログを書くまでが YAPC! ということで後編のブログも書き、カヤックの YAPC::Kyoto 2023 が完全終了しました。ご参加いただいたみなさま、関係各企業のみなさま、運営のみなさま、ありがとうございました。

カヤックではテックイベントを盛り上げるエンジニアも募集しています!

www.kayac.com

hubspot.kayac.com

*1:逗子海岸 は海水浴場開設期間外では直火でなければ焚き火が許可されています。