Redshift Streaming ingestionでKPLによる集約がされたレコードを読みたい。

この記事はAWS Analytics Advent Calendar 2022の25日目(裏)の記事です。
こんにちは、SREチーム所属の@mashiikeです。

AWS Analytics Advent Calender 2022の主催である @jostandardさんから、以下のようなリアクションいただいたので急遽、裏の記事として書いてしまいます。

今回は、先日GAになったRedshift Streaming ingestionに関する話です。

Redshift Streaming ingestion

簡単に言うと、ニアリアルタイムにKinesisのデータをRedshiftに取り込むことができる機能です。

さっくりと、getting-started に従って作っていきます。

Kinesis Data Streamを作って、IAM Roleを作って、IAM Roleを関連付ける

そして、External Schemaを作って、Materialized Viewを作ります。

DROP SCHEMA IF EXISTS source__kinesis CASCADE;

CREATE EXTERNAL SCHEMA IF NOT EXISTS source__kinesis
FROM KINESIS
IAM_ROLE 'arn:aws:iam::0123456789012:role/redshift-kinesis';

CREATE SCHEMA IF NOT EXISTS staging__kinesis;

DROP MATERIALIZED VIEW IF EXISTS staging__kinesis.view_all;

CREATE MATERIALIZED VIEW staging__kinesis.view_all
AUTO REFRESH YES 
AS
SELECT *
FROM source__kinesis."redshift-streaming-test";

ここまで来たら、Kinesis Data Streamになにか流すのですが、弊社の場合はECS Fargateのログを FireLens経由でKinesisに送るので、fluentdからpingを送ることにします。

FROM fluent/fluentd:v1.15.1-debian-1.0
USER root

RUN apt-get update \
        && apt-get install -y curl \
        && gem install \
        fluent-plugin-kinesis:3.4.2 \
        fluent-plugin-ping-message:1.0.0 \
        --no-document \
        && gem sources --clear-all \
        && apt-get purge -y --auto-remove $buildDeps \
        && rm -rf /var/lib/apt/lists/* \
        && rm -rf /tmp/* /var/tmp/* /usr/lib/ruby/gems/*/cache/*.gem

RUN mkdir -p /fluentd/etc/conf.d
COPY *.conf /fluentd/etc/conf.d/

USER fluent

ENV TZ=JST-9
RUN ["fluentd", "-c", "/fluentd/etc/conf.d/fluent.conf", "--verbose", "--dry-run"]
CMD ["fluentd", "-c", "/fluentd/etc/conf.d/fluent.conf"]

こんな感じでDockerfileを書いて、configは以下のようにします。

<source>
  @type ping_message
  @label @heartbeat_events
  interval 5s
  data     "this is ping message"
</source>

<label @heartbeat_events>
  <match ping>
    @type kinesis_streams_aggregated
    region ap-northeast-1
    stream_name redshift-streaming-test
  </match>
</label>

docker-compose.yamlは以下のような感じです。

version: "3.5"

services:
  fluentd:
    build:
      context: .
      dockerfile: Dockerfile
    container_name: "fluentd"
    environment:
      - 'AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID'
      - 'AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY'
      - 'AWS_SESSION_TOKEN=$AWS_SESSION_TOKEN'
    volumes:
      - ./fluentd/log:/fluentd/log

検証なので、一旦AssumeRoleして手に入れたenvを渡す感じでいきます。

さぁ、これでPingを送ってみます。

REFRESH MATERIALIZED VIEW staging__kinesis.view_all;
SELECT  *
FROM staging__kinesis.view_all
ORDER BY approximate_arrival_timestamp DESC;

… なにこれぇえええ!!?

あっ、そうでした。kinesis_dataはvarbyteでやってくるんでしたね。

SELECT  *, from_varbyte(kinesis_data, 'utf8') as kinesis_data_text
FROM staging__kinesis.view_all
ORDER BY approximate_arrival_timestamp DESC;

あるぇええええ!?

ということで前振りが長かったですが、本題です。

KPL(Kinesis Producer Library)で集約されたレコードを読みたい。

冒頭で読めなかったこのデータ、実はKPLと呼ばれるライブラリで複数のレコードが集約されたデータだったのです。

docs.aws.amazon.com

Kinesisの制約の一つに、1レコード最大1MBで1シャードあたり1000レコード/secというものがあります。
( クォータと制限 - Amazon Kinesis Data Streams )

KPLは、この制約のもとでKinesisを効率よく利用するために1レコードに複数のログを集約する事ができます。 1ログ1レコードで書き込むと、先程の1シャードあたりの書き込みレコード数の制限に当たりやすいということがあります。 1つのログは数百byte程度のことが多いので、1MBの書き込み制限をギリギリまで使って複数のログを1レコードに集約して、書き込みレコード数を減らせるととても嬉しいです。

Fluentd では @type kinesis_streams_aggregated ではなく、 @type kinesis_streams を使うことで、1ログ1レコードにできます。 しかし、先程の1シャードあたりの制限の関係もあり、無駄なく効率良く使うために実運用では @type kinesis_streams_aggregated を使いたいという背景があったりします。

さて冒頭の状況を解説すると、Redshit Streaming ingestionでKPLで集約されたレコードを読んだ場合、集約されたレコードがそのままのバイナリデータとしてくるため、utf8の文字列として解釈できず変換することができなかったというわけです。
もちろん、先程の背景もあるため、 KPLで集約しないという選択肢は取りづらいという状況になります。 コレは困ったことになりました。

そこで、解決手段として登場していただくのが、Lambda UDFです。
SQLでバイナリデータを操作するのはとても大変ですので、ソレを得意な言語に処理を委譲してしまおうというわけですね。

Go言語でKPLの集約をDe-aggregateする例は、弊社の藤原作 kinesis-tailfがあります。 さらに、Lambda UDFを作りやすくするハンドラライブラリ gravita を作成していたので、目的のものはさっくりと作れてしまいました。 それがこちらです。

github.com

実際に使ってみます。Lambda関数のdeployは lambda/ に纏めておきました。 providorやbackendの設定をするconfig.tfを以下のようにしておけばいい感じにterraform applyができます。

terraform {
  required_version = "~> 1.2.0"
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "= 4.17.1"
    }
  }
  backend "s3" {
    bucket = "terraform-example-com"
    key    = "redshift-udf-kpl-deaggregate/terraform.tfstate"
    region = "ap-northeast-1"
  }
}

provider "aws" {
  region = "ap-northeast-1"
}

lambdaのデプロイはKAYACではおなじみのlambroll を使います。

$ make terraform/plan
$ make terraform/apply
$ make lambroll/deploy

こうしてできたIAM RoleとLambda関数を利用して次のようにLambda UDFを定義します。

CREATE OR REPLACE EXTERNAL FUNCTION udf_kpl_deaggregate(varchar(max))
RETURNS varchar(max)
IMMUTABLE
LAMBDA 'redshift-udf-kpl-deaggregate'
IAM_ROLE 'arn:aws:iam::012345678910:role/lambda-udf-redshift';

この Lambda UDFを利用して次のようにMaterialized viewを作成します。

DROP MATERIALIZED VIEW IF EXISTS staging__kinesis.view_all;

CREATE MATERIALIZED VIEW staging__kinesis.view_all
AUTO REFRESH YES 
AS
SELECT 
    approximate_arrival_timestamp,
    partition_key,
    shard_id,
    sequence_number,
    JSON_PARSE(udf_kpl_deaggregate(from_varbyte(kinesis_data,'hex'))) as kinesis_data,
    refresh_time
FROM source__kinesis."redshift-streaming-test"
WHERE is_valid_json_array(udf_kpl_deaggregate(from_varbyte(kinesis_data,'hex')));

これでクエリを実行すると以下のようになります。

これでKPLで集約されたレコードを無事読めるようになりました。

実は… 実用的ではない。 VARCHAR(MAX)の壁

無事にKPLで集約されたレコードを読めるようになりました。 しかし、勘の良い方はすでにお気づきかもしれませんが、実はこの解決方法は実用できません。 その理由として、VARCHARのサイズ上限の問題と、Lambda UDFとPython UDFで直接VARBYTEを使用できないという話があるからです。

まずはじめに、VARBYTEのサイズについて見てみましょう。

VARBYTE 型 - Amazon Redshift

最大バイト数 (n) の範囲は1~1,024,000 です。デフォルト値は 64,000 です。

Redshift Streaming ingestionで渡されてくる kinesis_dataは最大で1MBほどのサイズを持つVARBYTEが来ることを想定して良いでしょう。(この数値は Kinesis Data Streamの1レコードの書き込み制限である)

一方で、VARCHARのサイズについて調べてみます。

文字型 - Amazon Redshift

どうやら、64KB-1 のようです。

実は、 from_verbyte(kinesis_data, 'hex') で VARBYTEをVARCHARに変換してもVARBYTEが大きすぎる場合はうまくいかないようです。

hexで、あふれてしまうならbase64 にすればいいのでは? そう思った方、残念ですが現時点でのRedshiftではbase64への変換方法は標準では提供されていないようです。 以下のようなPython UDFを定義したら? と考えた方もいるでしょう。

create or replace function udf_base64encode (a varbyte(1024000))
returns varchar(max)
immutable
as $$
    import base64
    if a is None:
        return None
    return base64.urlsafe_b64encode(a).rstrip('=')
$$ language plpythonu;

残念ながら、先程のVARBYTEのドキュメントには以下のようにあります。

Amazon Redshift で VARBYTE データ型を使用する際の制約事項

Python または Lambda ユーザー定義関数 (UDF) では VARBYTE データ型を使用することはできません。

現時点でLambda UDFやPython UDFはVARBYTE型を取り扱うことができません。

また、例えばVARBYTEをSUBSTRING等で32分割したとしても、集約を解除したレコードは1MBにふさわしいサイズになっているので、そちらでもVARCHAR(MAX)のサイズ上限の問題にぶつかります。 ですので、実はこのLambda UDFによるKPLで集約されたレコードのDe-aggregateは、実用できないというわけです。

そこでご登場いただくのが、AWS Support様です。

AWSでは、サポートセンター経由で機能リクエストのサポートケースを開くことができます。

今回の件を元に、2つの機能リクエストに関するサポートケースを開きました。

1つはfrom_varbyteやto_varbyteでbase64エンコーディングをサポートする もしくは、varbyte型に関してLambda UDFやPython UDFで取り扱えるようにする。 もう1つはRedshift Streaming ingestion の機能としてKPLで集約されたレコードのDe-aggregateをサポートしてほしいというものです。

このようにして、検証の結果困ったことが起きたら、サポートケースとして機能要望を送ることができます。 おそらくですが機能要望が多いものほど重要視されると思いますので、皆様もこういう機能がほしい、ああいう機能がほしいとなったら、積極的にユースケースを添えてサポートケースを開いてみてはどうでしょう。

おわりに

今回は、Redshift Streaming ingestionを実運用に乗せるために検証を行った結果、KPLによって集約されたレコードがそのままくるので、どうにかして集約前のレコードを読みたいというものでした。
そして、Lambda UDFによる解決を試みて、VARCHAR(MAX)のサイズ上限の由来の問題にあたったので、サポートケースを2つ開いて機能要望を送りました。
皆様も、新しい機能の検証の結果、『こういう機能がほしい!』となったことはありませんか? そういった場合は、ユースケースを添えて機能要望のサポートケースを開いてみましょう。
他にも同様の声が大きければ、思ったより早くやってくるかもしれません。

カヤックでは、Redshiftが大好きなエンジニアも募集しています。

hubspot.kayac.com