Amazon SQSを利用してAmazon S3からGoogle BigQueryにデータ投入するBQinというツールを書いた

こんにちは。技術部の池田です。

この記事では、AWSを使っているプロジェクトではありがちなAmazon S3からGoogle BigQueryにデータを投入するためのツールを書いた話をします。

BQin - BigQuery data importer with AWS S3 and SQS messaging.

名前からお察しの方もいらっしゃるとは思いますが、BQinは弊社藤原Rinから着想を得ています。 このツールは一言で表すと、データ投入先がRedshiftからBigQueryに変更されたRinです。 プロダクションに投入し1ヶ月以上になりますが、深刻な問題は発生せず動いております。

開発動機的な話

とあるプロジェクトでAWS S3にデータが投入されるから、GCP BigQueryへデータを転送したいという話がありました。 はじめのうちは、Cloud ComposerBigQuery Data Transfer Serviceなどの利用を検討していました。

  • GCPは現状BigQueryしか使っていない。
  • 何かしらの動くものは外形監視などが整っているAWSに寄せたい。

等々の大小細々とした理由から、S3のからの通知でデータをBigQueryに転送するなにかを作ることになりました。 初期構想では、Google Cloud Platformのbqコマンドやgsutilコマンドをlambdaで実行しようとしてました。 しかし、

『 要するにRinのbq版なんだよねこれ… 』by 藤原

の一言で、時間に余裕もあり作ってしまいました。

アーキテクチャー的な話

基本的なアーキテクチャーはRinと同じような感じです。

f:id:ikeda-masashi:20200601113925p:plain
BQinアーキテクチャー

動作の流れとしては以下のようになっております。

  1. S3に(何者かが) データを保存する
  2. SQS に S3のpath等が記述されたメッセージ(s3:CreateObject:*)が通知される。
  3. BQinがSQSメッセージを受信して次の処理を行う。

    • S3からデータを読み込み、Google Cloud Storageの一時バケットに転送する。
    • BigQueryのLoad Jobを作成し、データのLoadが完了するのを待つ。
    • Google Cloud Storageに転送したデータを消す。
  4. SQSのメッセージを削除し、データの処理を完了したことにする。

Go言語で制作されているので、バイナリダウンロードしS3とSQSの設定をした上で以下のような config を用意すれば利用できます。

config.yaml

queue_name: my_queue_name    # SQS queue name

cloud:
  aws: #awsのcredentialは省略した場合、インスタンス等から読み取るのもOK。
    region: ap-northeast-1
    access_key_id: {{ must_env "ACCESSS_KEY_ID" }}
    secret_access_key: {{ must_env "SECRET_ACCESS_KEY" }}
  gcp: #gcpのクレデンシャルはOfficalのGOOGLE_APPLICATION_CREDENTIALSの環境変数で与える形でもOK
    base64_credential: {{ must_env "GCP_CREDENTIAL_BASE64_JSON" }}

s3:
  bucket: bqin.bucket.test
  region: ap-northeast-1

big_query:
  project_id: bqin-test
  dataset: test

option:
  temporary_bucket: my_bucket_name # データの一時的な置き場
  gzip: true
  source_format: json # [csv, json, parquet] から選択
  auto_detect: true # works only csv or json

rules:
  - big_query:
      table: foo
    s3:
      key_prefix: test/foo

  - big_query:
      project_id: hoge
      dataset: bqin_test
      table: role
    s3:
      bucket: bqin.bucket.test
      key_regexp: test/([a-z]+)/([a-z]+)/.+\.csv
    option:
      gzip: false
      source_format: csv

起動コマンドとしては、以下で動きます。

$ bqin run -config config.yaml

Fargateで使いやすいように、Credential周りの設定は以下のような工夫があります。

  • AWSはconfigで省略した場合はEC2のinstance-roleやECSのtask-roleをから利用するようになってます。
  • GCPはSSMのパラメータストアからECSで取り扱いやすいようにcredentialのjsonをbase64エンコードしたものを取り扱えるようになっています。

どんなときにBQinを利用すると嬉しい?

GCPのサービスにはAWS S3からのデータ転送をサポートしてくれる物があるので、BQinの出番はあまり多くないような気もします。 しかし、以下のような場合はBQinが役に立つかもしれません。

S3のデータ投入のタイミングで取り込み開始したい場合

GCPのサービスの多くは、スケジュール実行のものが多いです。BQinはSQSを使っているのでS3からのPush型でデータの取り込みができます。 先日発表されたAmazon AppFlowなどもありますし、dataを貯めるのはS3で高度な分析等だけはBigQueryを利用したいというケースはあるかもしれません。 弊社の場合、Mobile AppのログはFirebaseに、Server AppのログはAmazon S3に保管されるケースがありがちです。FirebaseからはログをBigQueryへエクスポートできるので、Server AppのログもS3からBigQueryに転送したいというニーズがあります。

GCPはそれほど利用していない。

Google Cloud Platformを多用している場合は、terraformなどでの管理体制が確立していることがあります。その場合、Cloud ComposerやGKEなどを新しく利用することに腰の重さは感じないでしょう。しかし、『 BigQueryだけ使いたい 』『Spreadsheetと連携したい』等の局所利用にとどめてるケースではAWS側でデータ投入機構を管理できるのはメリットだと思います。

おわりに

データ分析基盤を整える周りの話でAWS-GCP間のデータ転送の話はよくある問題だと思います。 何を使うのが良いのかはケースバイケースだと思いますが、選択肢の一つにBQinを考えてみるのはいかがでしょうか? 今回は、Amazon S3からGoogle BigQueryへの転送を助けるツールを作ってみましたという話でした。

カヤックでは、ちょっと困った事が起きたときにツールを作るエンジニアも募集しています

平均FPSを楽に近似する

こんにちは。技術部平山です。

画像をクリックするとサンプルのwebGL実装に飛びます。

この記事では、ゲーム等々で平均のFPS(あるいはフレーム所要時間)を楽に近似する方法についての 思いつきを書きます。

思いつきですし、元々そんなに大変でもないことなので、大した価値はありません。

提案手法

以下の感じで平均FPSを近似します。Unityである必要もC#である必要もなく、 さらにはコードである必要もないですが、数式よりコードで書く方がピンと来ますよね?

public class Fps{
    public float fps{
        get{
            return 1f / avgTime;
        }
    }

    const float k = 0.05f;
    float avgTime;

    public void Update(){
        var dt = Time.deltaTime;
        avgTime *= 1f - k;
        avgTime += dt * k;
    }
}

本筋に関係ないクラス定義その他で長くなっていますが、本質はこれだけです。

avgTime *= 1f - k;
avgTime += dt * k;

kが0.1ならば、前の値を90%に減らして、新しい値を10%足します。 汚れた水槽の水を少し捨てて、減った分を足す、みたいな感じです。

動機

fpsを表示する際に、そのフレームの値だけから求めて表示すると、負荷の変動によって 著しく値が変わって見辛くなりますよね? 画面の数字があまり目まぐるしく変わると、「だいたいいくつくらいなの?」 というのがわかりにくくなるのです。

そこで、「表示だけ1秒に1回しか更新しない」という工夫もあるのですが、 たまたま表示した値が平均から外れていると、 それはそれで惑わされます。 「60.1、59.7、60.3、55.3...え、55?!」みたいな。

さらに見やすくするには、値の変化をなだらかにすれば良く、 それには平均するのが一番素直です。 例えば「最近60フレームの平均」を出すわけですね。 しかしそれには、最近60フレームの時間を取っておかねばなりません。

float[] times = new float[60];
int index;

みたいなのを用意して、indexを増やしながらフレームの所要時間を毎フレーム格納し、 毎フレーム平均を計算しなおします。 60回のループを回すくらいは屁でもない負荷ですが、 配列を定義するのもループを書くのも面倒くさいのです。

「同じようなことを配列もループもなしでできたらいいのに」と15年以上思っていましたが、 真面目に考えることもなく今日に至ってしまいました。

指数関数で近似する

今回思いついた「現在値を一定比率で減らしてから新しいのを足す」というのは、 平均を「指数関数を窓関数とした畳み込み」に置き換えることです。

平均の場合、例えば60フレーム以上古くなれば影響はゼロになりますが、 今回の場合はどんなに古いデータであっても影響はゼロにはなりません。 また、平均の場合60フレームに含まれるフレームは全て同じ価値で足されますが、 今回の方法だと、新しいものほど影響が大きく、古いものは影響が減ります。

しかし、「要するになだらかになればいいのだ」と考えれば、 実用上大した問題もありません。

私は技術ブログその他で何かと小さいサンプルを量産するわけですが、 その度にFPS表示を作るのは面倒です。 作っておいたクラスをつっこめば動くのでそれでもいいんですが、 .csファイルを足すことすら面倒だったりします。サンプルも大きくなりますし、 多数のサンプルプロジェクトに同じファイルのコピーが存在するのも気持ち悪いでしょう。

しかし今回の手法なら変数を1個置いて、2行で値の更新ができます。

スパイクも表示したい

さて、平均FPSはこれでいいのですが、 私は真面目に性能を調べる時には、「最近60フレームの最大フレーム時間」も画面に表示するようにしています。 いわゆる「負荷スパイク」です。

これも面倒でして、配列に時間を取っておいて、一番大きいものを毎フレーム探す処理が必要になります。 平均を出すために元々ループがあれば、そのついでにやるのですが、 今回の手法でループがなくなってしまったので「ついで」になりません。

どうにか、こいつもループなしで似たようなことができないでしょうか?

雑で良ければできる

できます。こんな感じです。

public class Spike{
    const float k = 0.05f;
    float sqSpike;

    public float spike{ get{ return Mathf.sqrt(sqSpike); } }

    public void Update(){
        var dt = Time.deltaTime;
        sqSpike *= 1f - k;
        sqSpike += (dt * dt) * k;
    }
}

ミソは二乗していることです。

sqSpike *= 1f - k;
sqSpike += (dt * dt) * k;

フレーム時間を二乗したものを足していき、値が欲しいと言われたら平方根を返します。 二乗すると、大きな値の寄与が大きく、小さな値の寄与が小さくなります。 例えば、1と10の平均は11/2=5.5ですが、2乗してから足すと 1*1+10*10=101で、この平均の平方根はsqrt(101/2)=7.1となります。 1の影響より10の影響が強く出ていますね。

このため、「ずっと16msだったけど、一回200msのスパイクが出た」 というようなことがあれば、200msの影響が強く出て、しばらくそれが残ります。 表示される値はどんどん減っていくので「何msのスパイクがあったのか?」 はわかりませんが、少なくとも「スパイクがあった」 ということはわかるわけです。そして大抵はそれで十分です。

なお、実際これを使ってみたところ、私の感覚では2乗ではまだ足りない感じがあります。 8乗くらいしちゃってもいいでしょう。 その場合、値を使う時には8乗根を求めます(pow(x, 1f / 8f))。 累乗する値が大きくなるほど、大きな値の寄与が大きくなるので、 よりスパイクが見えやすくなるのです。 例えば先程の1と10の例ならば、3乗にすると1*1*1 + 10*10*10 = 1001で、 2で割って3乗根を取れば7.9になります。より10の寄与が大きくなりました。

ただし、あまり累乗の値を大きくすると、 値への影響が長く残りすぎますし、演算誤差の問題もありますので、 ほどほどが良いでしょう。

deltaTimeについて注意

Time.deltaTime を使うと、Time.timeScaleTime.maximumDeltaTime の影響を受けて本当の時間からズレることがあります。

若干面倒になりますが、 Time.realtimeSinceStartup を使ったり、 System.DateTime を使ったりして、実時間で計算した方が無難でしょう。

サンプル

今回のサンプルは、累乗の値や、「どれくらい前の値が薄まるか」を決めるkを 調整して、使い勝手がどう変わるかを見るためのものです。 「100ms Spike!」と書かれたボタンを押すと100msのスパイクが起こるので、 スパイクを表す赤いグラフがグンと伸びて、平均を表す緑のグラフと 差が広がる様が見られます。 サンプルで「coeff」とあるのは、本文中のkです。 これが大きいほど、早く古い値の影響が小さくなって、新しい値に敏感になります。 小さくするほど長い時間の平均に近い値が見られるわけです。 個人的な感覚としては、0.01から0.05あたりが良いかと思います。

おわりに

私にとって具体的なので「FPS表示」という狭い話題にしましたが、 時間変化する値をサンプリングして、その変化を眺めたい、 というような場合、この手法はそのまま応用できます。

全サンプルを用意する必要がなく、逐次更新できるので、 サンプルの数がべらぼうに多い場合などは価値が大きいかもしれませんね。