RedshiftのMERGE SQL commandがGAになりましたね。

SREチームの池田です。 今回はAmazon RedshiftのMERGE SQL commandがGAになりましたので、MERGE SQL commandの何が嬉しいのかを話をしたいと思います。
SRE連載 4月号になります。

aws.amazon.com

3行でまとめ

  • RedshiftのMERGE SQL commandがGAになりました。
  • Bulk UpsertをSQL1文で実行できるものです。 以前と比べるとスッキリします。
  • 複数のデータソースから算出されるレポートの更新に使うと嬉しい。

以前のRedshiftにおけるBulk Upsertについて

ご存知かもしれませんが、『なかったらInsert、あったらUpdate』を通称Upsertといいます。

Redshiftにおける、Upsertのやり方ですがMERGE SQL commandが出る前のRedshiftでは以下のドキュメントにやり方が書いてあります。

docs.aws.amazon.com

既存のテーブルに新しいデータを追加するとき、更新と挿入を組み合わせてステージングテーブルから行うと効率的です。Amazon Redshift は merge または upsert (1 つのデータソースからテーブルを更新するコマンド) をサポートしていないものの、ステージングテーブルを作成し、このセクションに示した方法のいずれかを使ってステージングテーブルからテーブルを更新すれば、マージオペレーションを実現することができます。

例えば、以下のSQLで疑似データを作成します。

この疑似データは、あるWEBサービスのアクセスログがaccess_logsというテーブルに格納されることを想定しています。 そして、そのアクセスログから、 hourly_active_users という毎時あたりのアクセスuuの指標を算出するという状況を想定しています。

CREATE TABLE access_logs (
    access_at timestamp,
    user_id bigint,
    method varchar(6),
    uri varchar(max)
)
DISTSTYLE AUTO 
SORTKEY AUTO;

INSERT INTO access_logs (access_at, user_id, method, uri)
VALUES 
    ('2023-04-25 15:30:33', 1234, 'GET', 'http://example.com/'),
    ('2023-04-25 15:23:34', 5678, 'GET', 'http://example.com/'),
    ('2023-04-25 15:31:14', 9012, 'POST', 'http://example.com/hoge'),
    ('2023-04-25 15:57:24', 1234, 'POST', 'http://example.com/hoge'),
    ('2023-04-25 15:40:52', 9012, 'GET', 'http://example.com/'),
    ('2023-04-25 16:21:42', 5678, 'POST', 'http://example.com/hoge'),
    ('2023-04-25 17:11:34', 9012, 'GET', 'http://example.com/'),
    ('2023-04-25 18:15:37', 5678, 'GET', 'http://example.com/'),
    ('2023-04-25 19:28:42', 1234, 'GET', 'http://example.com/');

CREATE TABLE hourly_active_users 
DISTSTYLE AUTO 
SORTKEY AUTO
AS SELECT 
    date_trunc('hour', access_at) as access_hour
    ,count(distinct user_id) as uu
FROM access_logs
GROUP BY 1;

上記のSQLの結果、hourly_active_users テーブルの内容は以下のようになります。

dev=> SELECT * FROM hourly_active_users;
     access_hour     | uu 
---------------------+----
 2023-04-25 15:00:00 |  3
 2023-04-25 16:00:00 |  1
 2023-04-25 17:00:00 |  1
 2023-04-25 18:00:00 |  1
 2023-04-25 19:00:00 |  1
(5 rows) 

ここで、Upsertの説明の準備のために、データを追記しておきます。 新しく、access_logsにデータが追記することで、hourly_active_usersの更新が必要になると思います。

下記の想定は 2023-04-25 16:00:00 以降のログが遅延して到着するようなことを考えています。

-- データの追記
INSERT INTO access_logs (access_at, user_id, method, uri)
VALUES 
    ('2023-04-25 16:12:52', 9821, 'GET', 'http://example.com/'),
    ('2023-04-25 16:23:42', 3456, 'POST', 'http://example.com/hoge'),
    ('2023-04-25 17:11:34', 1212, 'GET', 'http://example.com/'),
    ('2023-04-25 17:23:37', 5555, 'GET', 'http://example.com/'),
    ('2023-04-25 17:19:42', 1234, 'GET', 'http://example.com/'),
    ('2023-04-25 18:52:42', 3223, 'GET', 'http://example.com/'),
    ('2023-04-25 19:19:42', 1234, 'GET', 'http://example.com/'),
    ('2023-04-25 20:31:42', 5423, 'GET', 'http://example.com/'),
    ('2023-04-25 21:31:42', 3232, 'GET', 'http://example.com/');
-- ここまでを①とする。

ここで、データを追記してMERGE SQL commandを使わずに、Upsertを実現してみます。

-- Staging テーブルの作成
CREATE TEMP TABLE temp_new_hourly_active_users
AS SELECT 
    date_trunc('hour', access_at) as access_hour
    ,count(distinct user_id) as uu
FROM access_logs
GROUP BY 1;

BEGIN;
-- 既存の行の削除
DELETE FROM hourly_active_users 
USING temp_new_hourly_active_users
WHERE hourly_active_users.access_hour = temp_new_hourly_active_users.access_hour;

-- 新しい行の追加
INSERT INTO hourly_active_users
SELECT * FROM temp_new_hourly_active_users;

COMMIT;

更新処理後のテーブルは、次のようになります。

dev=> SELECT * FROM hourly_active_users;
     access_hour     | uu 
---------------------+----
 2023-04-25 15:00:00 |  3
 2023-04-25 16:00:00 |  3
 2023-04-25 17:00:00 |  4
 2023-04-25 18:00:00 |  2
 2023-04-25 19:00:00 |  1
 2023-04-25 20:00:00 |  1
 2023-04-25 21:00:00 |  1
(7 rows)

2023-04-25 16:00:00 以降がデータとして追記されたので、2023-04-25 15:00:00 の uuは変わっておらず 2023-04-25 16:00:00 ~ 2023-04-25 19:00:00 までは更新され、 2023-04-25 20:00:00 以降は追記されました。 Upsertの処理が実現できていることがわかります。

MERGE SQL commandを使う場合

先日GAとなったMERGE SQL commandを使った例を見てみましょう。

docs.aws.amazon.com

単純にUpsertするケース

テーブルの状態としては、追記されたあと(①の時点)に戻しています。

dev=> SELECT count(*) FROM access_logs;
 count 
-------
    18
(1 row)

dev=> SELECT count(*) FROM hourly_active_users;
 count 
-------
     5
(1 row)

この状態で以下のMERGE SQL command を実行します。

-- Staging テーブルの作成
CREATE TEMP TABLE temp_new_hourly_active_users
AS SELECT 
    date_trunc('hour', access_at) as access_hour
    ,count(distinct user_id) as uu
FROM access_logs
GROUP BY 1;

MERGE INTO hourly_active_users
USING temp_new_hourly_active_users AS src
ON hourly_active_users.access_hour = src.access_hour
WHEN matched THEN UPDATE SET uu = src.uu
WHEN NOT matched THEN INSERT VALUES (src.access_hour, src.uu); 

この結果は、前述のUpsertの結果と一致します。

dev=> SELECT * FROM hourly_active_users;
     access_hour     | uu 
---------------------+----
 2023-04-25 15:00:00 |  3
 2023-04-25 16:00:00 |  3
 2023-04-25 17:00:00 |  4
 2023-04-25 18:00:00 |  2
 2023-04-25 19:00:00 |  1
 2023-04-25 20:00:00 |  1
 2023-04-25 21:00:00 |  1
(7 rows)

比較するとだいぶスッキリとかけますね。(トランザクションもはっていませんし。)

Nothing or Insertするケース

Upsertする方法はわかりました。 ところで、世の中には一度出力したレポートは更新したくないケースがあります。 それも、MERGE SQL commandで簡単に実行できます。

テーブルの状態をMERGE前(①の時点)に戻します。

状況の想定を思い出すと、この時点での hourly_active_users の中身は以下のようになっています。

dev=> SELECT * FROM hourly_active_users;
     access_hour     | uu 
---------------------+----
 2023-04-25 15:00:00 |  3
 2023-04-25 16:00:00 |  1
 2023-04-25 17:00:00 |  1
 2023-04-25 18:00:00 |  1
 2023-04-25 19:00:00 |  1
(5 rows) 

tempテーブルは同様に作成したとして、次のようにMERGE SQL commandを変更します。

MERGE INTO hourly_active_users
USING temp_new_hourly_active_users AS src
ON hourly_active_users.access_hour = src.access_hour
WHEN matched THEN UPDATE SET uu = hourly_active_users.uu
WHEN NOT matched THEN INSERT VALUES (src.access_hour, src.uu); 

UPDATE SET の後を src.uu から hourly_active_users.uu に変更しました。

dev=> SELECT * FROM hourly_active_users;                                          
     access_hour     | uu 
---------------------+----
 2023-04-25 15:00:00 |  3
 2023-04-25 16:00:00 |  1
 2023-04-25 17:00:00 |  1
 2023-04-25 18:00:00 |  1
 2023-04-25 19:00:00 |  1
 2023-04-25 20:00:00 |  1
 2023-04-25 21:00:00 |  1
(7 rows)

Upsertのときとは異なり、2023-04-25 16:00:00 ~ 2023-04-25 19:00:00 まではそのままで、 2023-04-25 20:00:00 以降は追記されました。

MySQLで言うところの INSERT INTO ON DUPLICATE KEY DO NOTHING に近い結果ですね。

何が嬉しいのか?

もちろん、表現がスッキリするとか、1つのSQL commandだけで完結することとかも嬉しいですが、もっと嬉しいことがあります。

それを一言でいうなら、『MERGE SQL commandをよく見ると、UPDATEとINSERTのカラムは任意に選択できること』が嬉しいのです。

この嬉しさを説明するために、hourly_active_users を集計する例とは別の例を考えます。

access_logsというデータがあるのは同様として、以下のような購入ログpurchase_logsというテーブルが追加であるとします。

CREATE TABLE purchase_logs (
    purchase_at timestamp,
    user_id bigint,
    item_id bigint,
    quantity integer,
    item_unit_price integer
)
DISTSTYLE AUTO 
SORTKEY AUTO;

INSERT INTO purchase_logs (purchase_at, user_id, item_id, quantity, item_unit_price)
VALUES 
    ('2023-04-25 13:30:33', 1234, 532, 1, 1000),
    ('2023-04-25 15:23:34', 5678, 111, 2, 500),
    ('2023-04-25 16:31:14', 9012, 222, 1, 200),
    ('2023-04-25 19:57:24', 1234, 111, 1, 500),
    ('2023-04-26 00:23:24', 1234, 333, 1, 600),
    ('2023-04-26 11:23:24', 9012, 333, 3, 600);

そして、access_logsから日次のアクセスユーザー数(DAU)としてactive_userspurchase_logsから日次の購入数purchase_countを集計して、以下のようなreportsテーブルに1日1行としてまとめるという例を考えます。 ymdがレポートの集計対象の日付、created_atがレポートの最初の作成日時、 updated_atがレポートの更新日時とします。

CREATE TABLE reports (
    ymd date,
    active_users integer,
    purchase_count integer,
    created_at timestamp,
    updated_at timestamp
)
DISTSTYLE AUTO 
SORTKEY AUTO;

このような1日毎に主要なKPIをまとめたレポートテーブルは、ありがちだと思います。

こういうテーブルの更新は、実は意外に面倒です。
よくある戦略としては、全部のデータをまとめて集計するという方法ではないでしょうか?
この例の場合は access_logspurchase_logsを集計する以下のような集計クエリを書くと思います。

-- すべてのデータの到着を確認したら、以下のクエリを実行する。
CREATE TEMP TABLE temp_new_daily_reports
AS WITH access_logs_metrics AS (
    SELECT 
        date_trunc('day', access_at)::date as ymd
        ,count(distinct user_id) as active_users
    FROM access_logs
    GROUP BY 1 
), purchase_logs_metrics AS (
    SELECT 
        date_trunc('day', purchase_at)::date as ymd
        ,count(*) as purchase_count
    FROM purchase_logs
    GROUP BY 1
), base as (
    SELECT ymd FROM access_logs_metrics
    UNION 
    SELECT ymd FROM purchase_logs_metrics 
)
SELECT 
    base.ymd,
    access_logs_metrics.active_users,
    purchase_logs_metrics.purchase_count,
    getdate() as created_at,
    getdate() as updated_at
FROM base 
LEFT JOIN access_logs_metrics ON base.ymd = access_logs_metrics.ymd
LEFT JOIN purchase_logs_metrics ON base.ymd = purchase_logs_metrics.ymd;

-- reports テーブルをUpsertする。
BEGIN;
DELETE FROM reports 
USING temp_new_daily_reports
WHERE reports.ymd = temp_new_daily_reports.ymd;

INSERT INTO reports
SELECT * FROM temp_new_daily_reports;

COMMIT;

しかし、その戦略で実際に運用してみると、いろいろなことがあります。

例えば、どこかのパイプラインでエラーが発生したりして、データの到着が遅れたりすることを考えます。
すべてのデータが確実に到着してから更新すること場合は、その日のレポートが1日分発生せずデータの鮮度のSLO違反になります。 かといって、歯抜けを許容して部分的にレポートを生成して更新するとなると、リトライ等でまた別の面倒なことが起きたりします。

こういう場合、パイプラインを分割して、データソースごとに部分的にレポートを更新するように作っておくと、運用上は楽になります。

今までは、このパイプラインの分割は結構手間でしたが、MERGE SQL commandを使用することでだいぶ楽になります。 具体的には、以下のような更新ができます。

-- access_logs用の集計パイプライン
CREATE TEMP TABLE temp_new_daily_active_users
AS SELECT 
    date_trunc('day', access_at)::date as ymd
    ,count(distinct user_id) as active_users
FROM access_logs
GROUP BY 1;

MERGE INTO reports
USING temp_new_daily_active_users AS src
ON reports.ymd = src.ymd
WHEN matched THEN 
    UPDATE SET active_users = src.active_users, updated_at = getdate()
WHEN NOT matched THEN 
    INSERT (ymd, active_users, created_at, updated_at)
    VALUES (src.ymd, src.active_users, getdate(), getdate());

このパイプラインだけ実行して、日次のactive_usersの集計結果をreportsテーブルに対してMERGE SQL commandを用いて、部分的にUpsertをできます。 この実行結果が以下になります。

dev=> select * from reports;
    ymd     | active_users | purchase_count |     created_at      |     updated_at      
------------+--------------+----------------+---------------------+---------------------
 2023-04-25 |           10 |                | 2023-04-25 10:01:43 | 2023-04-25 10:01:43
(1 row)

この時点では、purchase_count は NULLでレポートが生成されます。 別のパイプラインで今度は日次のpurchase_countを別途集計して更新します。

-- purchase_logs用の集計パイプライン
CREATE TEMP TABLE temp_new_daily_purchase_count
AS SELECT 
    date_trunc('day', purchase_at)::date as ymd
    ,count(*) as purchase_count
FROM purchase_logs
GROUP BY 1;

MERGE INTO reports
USING temp_new_daily_purchase_count AS src
ON reports.ymd = src.ymd
WHEN matched THEN 
    UPDATE SET purchase_count = src.purchase_count, updated_at = getdate()
WHEN NOT matched THEN 
    INSERT (ymd, purchase_count, created_at, updated_at)
    VALUES (src.ymd, src.purchase_count, getdate(), getdate());

この時点での、reportsテーブルは次のようになります。

dev=> select * from reports;
    ymd     | active_users | purchase_count |     created_at      |     updated_at      
------------+--------------+----------------+---------------------+---------------------
 2023-04-25 |           10 |              4 | 2023-04-25 10:01:43 | 2023-04-25 10:17:02
 2023-04-26 |              |              2 | 2023-04-25 10:16:58 | 2023-04-25 10:16:58
(2 rows)

もちろん、パイプラインの実行順序が逆でも同様の結果になります。

active_users の集計パイプラインとpurchase_countの集計パイプラインを分割しても、容易にレポートの更新ができるのです。 access_logs のパイプラインにエラーが発生して、データの到着が遅れたりしても、purchase_countの値だけは正常に出力できます。 遅れてデータが到着したときには、access_logsのパイプラインだけ実行すればよいのです。

全てのデータが到着してから集計するケースと比較すると、 SLI/SLOの観点では、reports テーブルに対してgraceful degradationを実現したということなります。

MERGE SQL commandがGAになったことで、このような1日ごとのKPIをまとめたような横長のレポートの部分更新が可能になりました。 複数のデータソースを元に作成されるレポートテーブルの更新に関しては、 簡単にパイプラインの分割とレポートテーブルの部分更新ができるようになったのです。 Redshift使いには大変嬉しいことだと個人的には思います。

まとめ

MERGE SQL commandがGAになりました。 このSQL commandはUpsertをより簡単に実現することができるcommandになります。
さらに、このcommandは列ごとの部分的なUpsertをすることができるため、複数のデータソースから算出されるような横長のテーブルの更新でgraceful degradationを容易に実現できるようになりました。 これは、データの信頼性向上に大きく貢献します。

カヤックではデータの信頼性を求めるエンジニアを募集しています! hubspot.kayac.com