DynamoDB から Redshift へデータ移送する話 (RedshiftのSUPER型の利用事例)

こんにちは。技術部のSRE所属の池田です。

この記事では、昨年の2020年12月にPreviewが発表されたRedshiftの汎用データ型 SUPER の利用事例として、DynamoDBからRedshiftへデータ移送する話をします。

汎用データ型SUPERは、2020/12/9に 『Amazon Redshift が、ネイティブ JSON と半構造化データ処理のサポートを発表 (プレビュー) 』という記事が公開され、2021年04月時点でもPreview状態です。 このSUPER型は、ログやDynamoDBストリームに見られるJSONおよび半構造化データと非常に相性がよく、使い始めてみると【素晴らしい!】と言う感想を得るので、皆様も是非使ってみてください。

背景

2021年4月現在では、Tonamel という大会プラットフォームサービスのデータ基盤 構築業務・整備を行っております。
Tonamelのトーナメント表構築に関しては、昨年のアドベントカレンダーで弊社 谷脇 が熱く語ってくれました。

techblog.kayac.com techblog.kayac.com

こちらのアドベントカレンダーの記事で、一言触れてますがTonamelのトーナメント表構築システムではDynamoDBを採用しています。 データ基盤では、既存のPerl Appが参照するAurora MySQLや、トーナメント表構築システムが参照するDynamoDBなど、様々な場所に蓄積されているデータを統合分析できるようにする必要がありました。 Tonamelのデータ基盤では、その要求を満たすために、Redshiftを利用する選択をしました。
Aurora MySQLに関しては、SUPER型と同じくPreview中の MySQLに対する Amazon Redshift Federated Query を利用することで、困難なくRedshiftからデータにアクセスできました。(そちらについては機会があれば別途記事にしようと思います。)

DynamoDBに関しては、DynamoDB Streamの情報をKinesis Data Streamに流し、Kinesis Data Firehose経由でRedshiftに取り込むことで解決しました。 その際に、SUPER型が非常に役に立ちましたので、今回記事にします。

利用事例

アーキテクチャーとしては、非常にシンプルです。

f:id:ikeda-masashi:20210426120244p:plain
アーキテクチャ
Kinesis Data FirehoseからRedshiftに取り込む部分はクラスメソッド株式会社様の記事を参照していただければ幸いです。 dev.classmethod.jp

このようなアーキテクチャーでDynamoDB StreamのKinesisアダプターを使うことで、DynamoDB 上で変更があった場合のデータの差分をKinesis Data Streamに投入することができます。 Kinesis Data Streamの中には、以下のようなデータが投入されます。

{
  "awsRegion": "ap-northeast-1",
  "dynamodb": {
    "ApproximateCreationDateTime": 1614246592241,
    "Keys": {
      "HashKey": {
        "S": "HASHKEY-96b8-6822cfdd73ba"
      },
      "SortKey": {
        "S": "SORTKEY-96b8-6822cfdd73ba"
      }
    },
    "NewImage": {
      "NumberValue": {
        "N": "1234567890"
      },
      "StringValue": {
        "S": "super string"
      },
      "ListMapValue": {
        "L": [
          {
            "M": {
              "StringValue": {
                "S": "b478e725-69c0-4b3a-8674-e229544e3f4b"
              },
              "NumberValue": {
                "N": "012345678"
              }
            }
          }
        ]
      },
      "BoolValue": {
        "BOOL": false
      },
      "HashKey": {
        "S": "HASHKEY-96b8-6822cfdd73ba"
      },
      "SortKey": {
        "S": "SORTKEY-96b8-6822cfdd73ba"
      }
    },
    "OldImage": {
      "NumberValue": {
        "N": "1234567890"
      },
      "StringValue": {
        "S": "super string"
      },
      "ListMapValue": {
        "L": [
          {
            "M": {
              "StringValue": {
                "S": "b478e725-69c0-4b3a-8674-e229544e3f4b"
              },
              "NumberValue": {
                "N": "012345678"
              }
            }
          }
        ]
      },
      "BoolValue": {
        "BOOL": true
      },
      "HashKey": {
        "S": "HASHKEY-96b8-6822cfdd73ba"
      },
      "SortKey": {
        "S": "SORTKEY-96b8-6822cfdd73ba"
      }
    },
    "SizeBytes": 800
  },
  "eventID": "00000000-0000-0000-0000-000000000000",
  "eventName": "MODIFY",
  "userIdentity": null,
  "recordFormat": "application/json",
  "tableName": "dummy_develop",
  "eventSource": "aws:dynamodb"
}

データ加工・変換について

このアーキテクチャーの場合、Kinesis Data Firehoseにデータ加工・変換用のLambda関数を設定することで、Streamに流れる情報をRedshiftに取り込むことが可能です。
従来では、Redshiftは半構造化データをそのまま取り扱うことができませんでしたので、上記のデータをRedshiftに取り込むためにはFlattenする必要がありました。 具体的には以下のようなデータへの変換が必要でした。

{
  "aws_region": "ap-northeast-1",
  "approximate_creation_date_time": 1614246592241,
  "hash_key": "HASHKEY-96b8-6822cfdd73ba",
  "sort_key": "SORTKEY-96b8-6822cfdd73ba",
  "new_image__number_value": 1234567890,
  "new_image__string_value": "super string",
  "new_image__list_map_value__string_value": "b478e725-69c0-4b3a-8674-e229544e3f4b",
  "new_image__list_map_value__number_value": 012345678,
  "new_image__list_map_value__bool_value": false,
  "new_image__hash_key": "HASHKEY-96b8-6822cfdd73ba",
  "new_image__sort_key": "SORTKEY-96b8-6822cfdd73ba",
  "old_image__number_value": 1234567890,
  "old_image__string_value": "super string",
  "old_image__list_map_value__string_value": "b478e725-69c0-4b3a-8674-e229544e3f4b",
  "old_image__list_map_value__number_value": 012345678,
  "old_image__list_map_value__bool_value": false,
  "old_image__hash_key": "HASHKEY-96b8-6822cfdd73ba",
  "old_image__sort_key": "SORTKEY-96b8-6822cfdd73ba",
  "size_bytes": 800,
  "event_iD": "00000000-0000-0000-0000-000000000000",
  "event_name": "MODIFY",
  "user_identity": null,
  "record_format": "application/json",
  "table_name": "dummy_develop",
  "event_source": "aws:dynamodb"
}
{
  //list_map_value の2要素目がある場合は追加が必要 
}

このデータ変換が特に困る理由として2点あります。

  • List要素がある場合1要素1レコードに変換する必要がある。
  • DynamoDBのテーブルの構造が変わる度に、このデータ変換をメンテナンスする必要がある。

1個目の困りごと故に、従来ではRedshift Spectrumでarrayやstructなどの型定義をして読み取ることもよくあったのではないでしょうか? その場合でも2個目の困りごとは残ったままです。

RedshiftのSUPER型を用いることで、半構造化データを取り扱えるようになるので、このデータ変換は必要なくなります。 しかし、そのまま取り込むとSQLでの利用時に以下のようなDDLとSELECTクエリになってしまい、取り扱いしづらい状態になります。

CREATE TABLE IF NOT EXISTS ddb_stream (
    awsregion      varchar ENCODE ZSTD,
    dynamodb       super   ENCODE ZSTD,
    eventid        varchar ENCODE ZSTD,
    eventname      varchar ENCODE ZSTD,
    recordformat   varchar ENCODE ZSTD,
    tablename      varchar ENCODE ZSTD,
    eventsource    varchar ENCODE ZSTD
);
SELECT 
    awsregion
    ,dynamodb.keys.hashkey.s::varchar as hash_key
    ,dynamodb.newimage.listmapvalue.l[0].m.stringvalue.s::varchar as new_image__list_map_value__0__string_value
FROM ddb_stream

SQLはcase insensitiveである都合とSUPER型の都合上、読みづらいクエリでアクセスすることになります。 SUPER型を使った場合以下のように、JSON Key をスネークケースにしておく、DynamoDB特有の型構造をFlatttenする程度の加工はしておいたほうが、Redshift上で取り扱いやすくなります。 すべてをFlattenする必要はないので加工の難易度は格段に下がると思います。 具体的には、このような形で変換すると使いやすくなるでしょう。

{
  "aws_region": "ap-northeast-1",
  "dynamodb": {
    "approximate_creation_date_time": 1614246592241,
    "keys": {
      "hash_key": "HASHKEY-96b8-6822cfdd73ba",
      "sort_key": "SORTKEY-96b8-6822cfdd73ba"
    },
    "new_image": {
      "number_value": 1234567890,
      "string_value": "super string",
      "list_map_value": [
        { 
          "string_value": "b478e725-69c0-4b3a-8674-e229544e3f4b",
          "number_value": 012345678,
        }
      ],
      "bool_value": false,
      "hash_key": "HASHKEY-96b8-6822cfdd73ba",
      "sort_key": "SORTKEY-96b8-6822cfdd73ba"
    },
    "old_image": {
      "number_value": 1234567890,
      "string_value": "super string",
      "list_map_value": [
        {
          "string_value": "b478e725-69c0-4b3a-8674-e229544e3f4b",
          "number_value": 012345678,
        }
      ],
      "bool_value": true,
      "hash_key": "HASHKEY-96b8-6822cfdd73ba",
      "sort_key": "SORTKEY-96b8-6822cfdd73ba"
    },
    "size_bytes": 800,
  },
  "event_id": "00000000-0000-0000-0000-000000000000",
  "event_name": "MODIFY",
  "user_identity": null,
  "record_format": "application/json",
  "table_name": "dummy_develop",
  "event_source": "aws:dynamodb"
}

※DynamoDBの型特有のFlattenが難しい場合は、JSON Keyをスネークケースにするだけでも取り扱いやすくなります。 さて、これを取り込む場合は以下のスキーマのテーブルを用意することで取り込めます。

CREATE TABLE IF NOT EXISTS ddb_stream (
    aws_region      varchar ENCODE ZSTD,
    dynamodb        super   ENCODE ZSTD,
    event_id        varchar ENCODE ZSTD,
    event_name      varchar ENCODE ZSTD,
    record_format   varchar ENCODE ZSTD,
    table_name      varchar ENCODE ZSTD,
    event_source    varchar ENCODE ZSTD
);

この ddb_stream テーブルからデータを読み取る場合は、以下のようなSELECTクエリが使えます。

SELECT 
    aws_region
    ,dynamodb.hash_key::varchar as hash_key
    ,dynamodb.new_image.list_map_value[0].string_value::varchar as new_image__list_map_value__0__string_value
FROM ddb_stream 

List要素をそのまま取り扱うことができ、DynamoDBのテーブルの構造の変化由来のメンテナンスからも開放されました。

最新データの取得方法に関して

ここまでで、DynamoDB上で変更があったデータの差分をRedshiftに蓄積していく状態になりました。 データの利用ケースとして、DyanamoDBの最新状態を知りたいというのはよくあるケースだと思います。  ddb_streamに流れるデータは変更の差分なので、最新状態を見るのに単純なクエリではできません。 ですので、以下のようなマテリアライズドビューを作成しておくと、非常に便利だと思います。

CREATE MATERIALIZED VIEW v_ddb  
BACKUP NO
AUTO REFRESH YES
AS 
WITH base as (
    SELECT
        dynamodb.keys.hash_key::varchar(512) as hash_key
        ,dynamodb.keys.sort_key::varchar(512) as sort_key
        ,case 
            when event_name in ('INSERT', 'MODIFY') then dynamodb.new_image
            when event_name = 'REMOVE' then dynamodb.old_image 
        end as item
        ,dynamodb.approximate_creation_date_time::bigint as updated_at
        ,table_name
    FROM ddb_stream
)
SELECT
    hash_key
    ,sort_key
    ,item
    ,updated_at
    ,table_name
FROM (
    SELECT
        *
        ,row_number() over (
                partition by hash_key, sort_key
                order by updated_at desc
        ) as rn
    FROM base
) 
WHERE rn = 1; 

このマテリアライズドビューを用いて、最新のstring_valueを知る場合は、以下のように利用することが可能です。

SELECT 
    hash_key
    ,sort_key
    ,item.string_value::varchar as string_value
FROM v_ddb

list_map_value内のstring_valueについては以下のように利用することが可能です。

SELECT
    d.hash_key
    ,d.sort_key
    ,l.string_value::varchar as string_value
FROM v_ddb as d, ddb.item.list_map_value as l

非常に直感的でわかりやすく利用できます。
その他の、SUPER型のnavigationについては『Querying semistructured data』 の参照をお願いします。具体的な型の検証や利用方法が解説されています。

もちろん、ワークフローエンジン等を用いて更にETLを加えてRedshift Spectrumで参照できるようにRedshift上でFlattenするのも良いです。 SUPER型を用いているので、DynamoDBのスキーマレスな特性をそのままに利用できるのがメリットです。

おわりに

今回はRedshiftのPreview中の機能 SUPER型を用いた、 DynamoDB から Redshift へデータ移送する話でした。 JSONや半構造化データがより取り扱いやすなるRedshiftは、AWS上でのデータ活用に関していろいろな場面で活躍することでしょう。

Redshift愛に満ちているエンジニアの皆様の一助になれば幸いです。

カヤックではデータ活用やSREに興味のあるエンジニアも募集しています