DynamoDB から Redshift へデータ移送する話 (RedshiftのSUPER型の利用事例)

こんにちは。技術部のSRE所属の池田です。

この記事では、昨年の2020年12月にPreviewが発表されたRedshiftの汎用データ型 SUPER の利用事例として、DynamoDBからRedshiftへデータ移送する話をします。

汎用データ型SUPERは、2020/12/9に 『Amazon Redshift が、ネイティブ JSON と半構造化データ処理のサポートを発表 (プレビュー) 』という記事が公開され、2021年04月時点でもPreview状態です。 このSUPER型は、ログやDynamoDBストリームに見られるJSONおよび半構造化データと非常に相性がよく、使い始めてみると【素晴らしい!】と言う感想を得るので、皆様も是非使ってみてください。

背景

2021年4月現在では、Tonamel という大会プラットフォームサービスのデータ基盤 構築業務・整備を行っております。
Tonamelのトーナメント表構築に関しては、昨年のアドベントカレンダーで弊社 谷脇 が熱く語ってくれました。

techblog.kayac.com techblog.kayac.com

こちらのアドベントカレンダーの記事で、一言触れてますがTonamelのトーナメント表構築システムではDynamoDBを採用しています。 データ基盤では、既存のPerl Appが参照するAurora MySQLや、トーナメント表構築システムが参照するDynamoDBなど、様々な場所に蓄積されているデータを統合分析できるようにする必要がありました。 Tonamelのデータ基盤では、その要求を満たすために、Redshiftを利用する選択をしました。
Aurora MySQLに関しては、SUPER型と同じくPreview中の MySQLに対する Amazon Redshift Federated Query を利用することで、困難なくRedshiftからデータにアクセスできました。(そちらについては機会があれば別途記事にしようと思います。)

DynamoDBに関しては、DynamoDB Streamの情報をKinesis Data Streamに流し、Kinesis Data Firehose経由でRedshiftに取り込むことで解決しました。 その際に、SUPER型が非常に役に立ちましたので、今回記事にします。

利用事例

アーキテクチャーとしては、非常にシンプルです。

f:id:ikeda-masashi:20210426120244p:plain
アーキテクチャ
Kinesis Data FirehoseからRedshiftに取り込む部分はクラスメソッド株式会社様の記事を参照していただければ幸いです。 dev.classmethod.jp

このようなアーキテクチャーでDynamoDB StreamのKinesisアダプターを使うことで、DynamoDB 上で変更があった場合のデータの差分をKinesis Data Streamに投入することができます。 Kinesis Data Streamの中には、以下のようなデータが投入されます。

{
  "awsRegion": "ap-northeast-1",
  "dynamodb": {
    "ApproximateCreationDateTime": 1614246592241,
    "Keys": {
      "HashKey": {
        "S": "HASHKEY-96b8-6822cfdd73ba"
      },
      "SortKey": {
        "S": "SORTKEY-96b8-6822cfdd73ba"
      }
    },
    "NewImage": {
      "NumberValue": {
        "N": "1234567890"
      },
      "StringValue": {
        "S": "super string"
      },
      "ListMapValue": {
        "L": [
          {
            "M": {
              "StringValue": {
                "S": "b478e725-69c0-4b3a-8674-e229544e3f4b"
              },
              "NumberValue": {
                "N": "012345678"
              }
            }
          }
        ]
      },
      "BoolValue": {
        "BOOL": false
      },
      "HashKey": {
        "S": "HASHKEY-96b8-6822cfdd73ba"
      },
      "SortKey": {
        "S": "SORTKEY-96b8-6822cfdd73ba"
      }
    },
    "OldImage": {
      "NumberValue": {
        "N": "1234567890"
      },
      "StringValue": {
        "S": "super string"
      },
      "ListMapValue": {
        "L": [
          {
            "M": {
              "StringValue": {
                "S": "b478e725-69c0-4b3a-8674-e229544e3f4b"
              },
              "NumberValue": {
                "N": "012345678"
              }
            }
          }
        ]
      },
      "BoolValue": {
        "BOOL": true
      },
      "HashKey": {
        "S": "HASHKEY-96b8-6822cfdd73ba"
      },
      "SortKey": {
        "S": "SORTKEY-96b8-6822cfdd73ba"
      }
    },
    "SizeBytes": 800
  },
  "eventID": "00000000-0000-0000-0000-000000000000",
  "eventName": "MODIFY",
  "userIdentity": null,
  "recordFormat": "application/json",
  "tableName": "dummy_develop",
  "eventSource": "aws:dynamodb"
}

データ加工・変換について

このアーキテクチャーの場合、Kinesis Data Firehoseにデータ加工・変換用のLambda関数を設定することで、Streamに流れる情報をRedshiftに取り込むことが可能です。
従来では、Redshiftは半構造化データをそのまま取り扱うことができませんでしたので、上記のデータをRedshiftに取り込むためにはFlattenする必要がありました。 具体的には以下のようなデータへの変換が必要でした。

{
  "aws_region": "ap-northeast-1",
  "approximate_creation_date_time": 1614246592241,
  "hash_key": "HASHKEY-96b8-6822cfdd73ba",
  "sort_key": "SORTKEY-96b8-6822cfdd73ba",
  "new_image__number_value": 1234567890,
  "new_image__string_value": "super string",
  "new_image__list_map_value__string_value": "b478e725-69c0-4b3a-8674-e229544e3f4b",
  "new_image__list_map_value__number_value": 012345678,
  "new_image__list_map_value__bool_value": false,
  "new_image__hash_key": "HASHKEY-96b8-6822cfdd73ba",
  "new_image__sort_key": "SORTKEY-96b8-6822cfdd73ba",
  "old_image__number_value": 1234567890,
  "old_image__string_value": "super string",
  "old_image__list_map_value__string_value": "b478e725-69c0-4b3a-8674-e229544e3f4b",
  "old_image__list_map_value__number_value": 012345678,
  "old_image__list_map_value__bool_value": false,
  "old_image__hash_key": "HASHKEY-96b8-6822cfdd73ba",
  "old_image__sort_key": "SORTKEY-96b8-6822cfdd73ba",
  "size_bytes": 800,
  "event_iD": "00000000-0000-0000-0000-000000000000",
  "event_name": "MODIFY",
  "user_identity": null,
  "record_format": "application/json",
  "table_name": "dummy_develop",
  "event_source": "aws:dynamodb"
}
{
  //list_map_value の2要素目がある場合は追加が必要 
}

このデータ変換が特に困る理由として2点あります。

  • List要素がある場合1要素1レコードに変換する必要がある。
  • DynamoDBのテーブルの構造が変わる度に、このデータ変換をメンテナンスする必要がある。

1個目の困りごと故に、従来ではRedshift Spectrumでarrayやstructなどの型定義をして読み取ることもよくあったのではないでしょうか? その場合でも2個目の困りごとは残ったままです。

RedshiftのSUPER型を用いることで、半構造化データを取り扱えるようになるので、このデータ変換は必要なくなります。 しかし、そのまま取り込むとSQLでの利用時に以下のようなDDLとSELECTクエリになってしまい、取り扱いしづらい状態になります。

CREATE TABLE IF NOT EXISTS ddb_stream (
    awsregion      varchar ENCODE ZSTD,
    dynamodb       super   ENCODE ZSTD,
    eventid        varchar ENCODE ZSTD,
    eventname      varchar ENCODE ZSTD,
    recordformat   varchar ENCODE ZSTD,
    tablename      varchar ENCODE ZSTD,
    eventsource    varchar ENCODE ZSTD
);
SELECT 
    awsregion
    ,dynamodb.keys.hashkey.s::varchar as hash_key
    ,dynamodb.newimage.listmapvalue.l[0].m.stringvalue.s::varchar as new_image__list_map_value__0__string_value
FROM ddb_stream

SQLはcase insensitiveである都合とSUPER型の都合上、読みづらいクエリでアクセスすることになります。 SUPER型を使った場合以下のように、JSON Key をスネークケースにしておく、DynamoDB特有の型構造をFlatttenする程度の加工はしておいたほうが、Redshift上で取り扱いやすくなります。 すべてをFlattenする必要はないので加工の難易度は格段に下がると思います。 具体的には、このような形で変換すると使いやすくなるでしょう。

{
  "aws_region": "ap-northeast-1",
  "dynamodb": {
    "approximate_creation_date_time": 1614246592241,
    "keys": {
      "hash_key": "HASHKEY-96b8-6822cfdd73ba",
      "sort_key": "SORTKEY-96b8-6822cfdd73ba"
    },
    "new_image": {
      "number_value": 1234567890,
      "string_value": "super string",
      "list_map_value": [
        { 
          "string_value": "b478e725-69c0-4b3a-8674-e229544e3f4b",
          "number_value": 012345678,
        }
      ],
      "bool_value": false,
      "hash_key": "HASHKEY-96b8-6822cfdd73ba",
      "sort_key": "SORTKEY-96b8-6822cfdd73ba"
    },
    "old_image": {
      "number_value": 1234567890,
      "string_value": "super string",
      "list_map_value": [
        {
          "string_value": "b478e725-69c0-4b3a-8674-e229544e3f4b",
          "number_value": 012345678,
        }
      ],
      "bool_value": true,
      "hash_key": "HASHKEY-96b8-6822cfdd73ba",
      "sort_key": "SORTKEY-96b8-6822cfdd73ba"
    },
    "size_bytes": 800,
  },
  "event_id": "00000000-0000-0000-0000-000000000000",
  "event_name": "MODIFY",
  "user_identity": null,
  "record_format": "application/json",
  "table_name": "dummy_develop",
  "event_source": "aws:dynamodb"
}

※DynamoDBの型特有のFlattenが難しい場合は、JSON Keyをスネークケースにするだけでも取り扱いやすくなります。 さて、これを取り込む場合は以下のスキーマのテーブルを用意することで取り込めます。

CREATE TABLE IF NOT EXISTS ddb_stream (
    aws_region      varchar ENCODE ZSTD,
    dynamodb        super   ENCODE ZSTD,
    event_id        varchar ENCODE ZSTD,
    event_name      varchar ENCODE ZSTD,
    record_format   varchar ENCODE ZSTD,
    table_name      varchar ENCODE ZSTD,
    event_source    varchar ENCODE ZSTD
);

この ddb_stream テーブルからデータを読み取る場合は、以下のようなSELECTクエリが使えます。

SELECT 
    aws_region
    ,dynamodb.hash_key::varchar as hash_key
    ,dynamodb.new_image.list_map_value[0].string_value::varchar as new_image__list_map_value__0__string_value
FROM ddb_stream 

List要素をそのまま取り扱うことができ、DynamoDBのテーブルの構造の変化由来のメンテナンスからも開放されました。

最新データの取得方法に関して

ここまでで、DynamoDB上で変更があったデータの差分をRedshiftに蓄積していく状態になりました。 データの利用ケースとして、DyanamoDBの最新状態を知りたいというのはよくあるケースだと思います。  ddb_streamに流れるデータは変更の差分なので、最新状態を見るのに単純なクエリではできません。 ですので、以下のようなマテリアライズドビューを作成しておくと、非常に便利だと思います。

CREATE MATERIALIZED VIEW v_ddb  
BACKUP NO
AUTO REFRESH YES
AS 
WITH base as (
    SELECT
        dynamodb.keys.hash_key::varchar(512) as hash_key
        ,dynamodb.keys.sort_key::varchar(512) as sort_key
        ,case 
            when event_name in ('INSERT', 'MODIFY') then dynamodb.new_image
            when event_name = 'REMOVE' then dynamodb.old_image 
        end as item
        ,dynamodb.approximate_creation_date_time::bigint as updated_at
        ,table_name
    FROM ddb_stream
)
SELECT
    hash_key
    ,sort_key
    ,item
    ,updated_at
    ,table_name
FROM (
    SELECT
        *
        ,row_number() over (
                partition by hash_key, sort_key
                order by updated_at desc
        ) as rn
    FROM base
) 
WHERE rn = 1; 

このマテリアライズドビューを用いて、最新のstring_valueを知る場合は、以下のように利用することが可能です。

SELECT 
    hash_key
    ,sort_key
    ,item.string_value::varchar as string_value
FROM v_ddb

list_map_value内のstring_valueについては以下のように利用することが可能です。

SELECT
    d.hash_key
    ,d.sort_key
    ,l.string_value::varchar as string_value
FROM v_ddb as d, ddb.item.list_map_value as l

非常に直感的でわかりやすく利用できます。
その他の、SUPER型のnavigationについては『Querying semistructured data』 の参照をお願いします。具体的な型の検証や利用方法が解説されています。

もちろん、ワークフローエンジン等を用いて更にETLを加えてRedshift Spectrumで参照できるようにRedshift上でFlattenするのも良いです。 SUPER型を用いているので、DynamoDBのスキーマレスな特性をそのままに利用できるのがメリットです。

おわりに

今回はRedshiftのPreview中の機能 SUPER型を用いた、 DynamoDB から Redshift へデータ移送する話でした。 JSONや半構造化データがより取り扱いやすなるRedshiftは、AWS上でのデータ活用に関していろいろな場面で活躍することでしょう。

Redshift愛に満ちているエンジニアの皆様の一助になれば幸いです。

カヤックではデータ活用やSREに興味のあるエンジニアも募集しています

業務中でも堂々とサボってSlackのtimes文化を満喫したい

(。ì _ í。) < この記事は 𝕂𝔸𝕐𝔸ℂ 𝔸𝕕𝕧𝕖𝕟𝕥 ℂ𝕒𝕝𝕖𝕟𝕕𝕒𝕣 𝟚𝟘𝟚𝟘 25日目の記事です




はじめに

どーも、こんにちは! 20年度新卒の色物枠のakipakaです、waiwai。 ぼくらの甲子園ポケットというソーシャルゲーム(通称:ぼくポケ)のサーバーサイドやっとります!!

備忘録として僕が新卒研修のときに作ったものについて書きたいと思います!

ところで僕は根がとても不真面目な人間なので、業務中にslackを眺めては無為に時間を過ごしサボっている訳ですが、入社1ヶ月と立たないうちにclubだのtimesだの余りにチャンネルに入り過ぎて手に負えないと言う自体に陥りました。


times 分報って何!?ってなった方にオススメの記事

https://giginc.co.jp/blog/news/slack-hunho


ひとまず原因を分析しながら小首を傾げて唸っていました。

なぜ追えないのか僕なりにその理由について考えてみました。


1. チャンネルの更新スピードが早すぎる、秒速5センチメートル位の更新頻度。
2. 一気に沢山のチャンネルに入りすぎた、人間関係を気にすると抜けられない豆腐メンタル。
3. 僕がわるいんじゃあない、みんなのtimesが面白過ぎるのだ。


結論:やっぱり1つのTLで見たいときに見ればいいtwitter最高なんだなぁ。


そこで一つの考えにたどり着く、slackをtwitterみたいに1つにまとめちゃえばいいぢぁん。

""破壊的な発想""

頭の中のジャニー喜多川が言う。「YOUやっちゃいなよ」

んで新卒研修のときに一緒に作りたいと言ってくれたmiyabin君とともに実装に取り組みました。主にakipakaがDB周りのAPI実装(akipakaAPI)を、miyabinがIO周りのAPI実装(miyabinAPI)に取り組みました。研修の1週間だけでなくちょいちょいプライベートでも取り組み、約1ヶ月弱で以下のようなものを実装することが出来ました。

概要


f:id:mamepon2580:20201224103914p:plain


TimesLineという、連携されたtimesにあった投稿を同期して、1つのchannelに表示できるアプリケーションを作りました。アイコンなどは固有のuserのものになっていますが実際はbotが投稿しています。timeslineでは名前の横に投稿された元のtimes名が表示されます。>マークをクリックすると元のtimesでの投稿に飛ぶことが出来ます。

f:id:mamepon2580:20201224103855p:plain

こんな感じ

技術構成

使った技術はこんな感じです。当時はあんまりDocker慣れしてなかったのでVM使ってローカルに立ててそれをherokuに乗っけてデプロイしました。PostgreSQLの設定に苦しめられた苦い思い出。SlackのAPIをゴリゴリ使い倒して作ってます。

f:id:mamepon2580:20201224103905p:plain

DBの設計

めっちゃ簡素でslack上にはchannel同士を連携するrelationとpost,threadなどの投稿を同期するpost_relation、thread_relationから出来ています。それぞれがchannelの中にpostが、postの中にthreadがあるのでそれぞれ1:nの関係になっています。

f:id:mamepon2580:20201224103902p:plain

機能

機能は大きく分けて3つあります

  1. channel同士を連携する機能
  2. 連携されたchannelの投稿を同期する機能
  3. 同期されたpostの中のthreadを同期する機能

またmessegeとpictureなどを投稿でき、それぞれにpost(投稿)、edit(編集)、delete(削除)の同期を実装しています。

timesをtimeslineへ連携する機能

#times ⇒ #timesline​
channel,

timesからtimeslineへの投稿

#times ⇒ #timesline​
post, thread,

timeslineからtimesへの投稿

timesline ⇒ #times
thread,

投稿できる内容

messege, picture

投稿に対する処理

post, edit, delete

よく見るとtimesline ⇒ timesではthreadしか投稿できるようにしていないのですが、これは色んな人が入り混じってるtimeslineの投稿が自分のtimesに流れて来ないようにする為です。ただしthreadはpostに対してつくものなので、自分のtimesのpostと連携されたtimeslineのpostにthreadがついた場合にのみ元のtimesのpostにも投稿するという形をとっています。難しい事を言っている。

設計

機能が多いので3つだけ設計の詳細を説明したいと思います。

分岐部分

slackから受け取った情報を受け取り処理によって分岐させる部分の設計です。ここでchannel同士を連携するbotコマンドやpost,threadに対するpost,edit,deleteなどの処理を分岐しAPIに渡していきます。

f:id:mamepon2580:20201224103856p:plain

timesからtimeslineへのpostの同期

連携されたtimesでpostの投稿が行われ発火が起こるとそれを分岐にかけて分別し、relation/searchのAPIにid渡します。そこで連携されているすべてのtimeslineのidを探し出し、それぞれにtimesline/throwに渡し投稿させていきます。その後でtimeslineの投稿されたpostのidを取得し、timesのpostとtimeslineのpostのidをDBに記述します。

f:id:mamepon2580:20201224103849p:plain

timeslineからtimesへのthreadの同期

こちらはtimesからtimeslineのpostの同期と比較するとかなり複雑です。timeslineからtimesへのthreadの同期なので子→親→子のように同期していく必要があるからです。ここではtimeslineのidとtimeslineに投稿されたpostのidでtimesのidとpostのidを見つけだしtimesに投稿します。そして、元々投稿のあったtimesline以外のtimeslineにも同期させます。

f:id:mamepon2580:20201224103911p:plain

連携されたtimeslineでthreadの投稿が行われ発火が起こるとそれを分岐にかけて分別し、post_relation/detectのAPIにid渡します。そこで連携されているtimesのidを探し出し、それぞれにtimes/throw_threadに渡し投稿させていきます。その後でtimeslineの投稿されたthreadのidを取得し、timesのthreadとtimeslineのthreadのidをDBに記述します。そのままpost_relation/syncを叩き「timesからtimeslineのpostの同期」と同じように他のtimeslineにもthreadを投稿していきます。

f:id:mamepon2580:20201224103852p:plain

まとめ

使ってみて最高かよって感じ。いろんなチャンネルピコピコ光ってるけどぶっちゃけtimeslineだけ見てれば全部追えるし、入ってるtimesで話してる内容にもリンク機能で飛んでいけるのでwaiwai絡んでいけるし。すばらしみ。今回は実験的に作った感じでherokuのDBいっぱいになって使えなくなっちゃったけど、作り直してtimeslineライフ満喫してQOL上げてきたいです!

チーム開発においてだと、設計は僕が作りましたがSlackのAPIの調査などはmiyabinに任せきりだったので、上手く連携を取りながら分担して進められたかなと思います。実際今配属されているぼくポケの開発でも、ここまで役割分担を気にしなけれいけないことはそれほどないのでいい経験できたかな。そして何より、2人でこういう機能があったらいいねっていう機能のアイデアを出してそれいいねって言い合えるのがすごく楽しかった。一緒に作ってくれてmiyabinありがとう。感謝。