CloudWatch + SQS でバッチサーバー冗長化のために実装した「sqsjkr」の話

Lobiの吉村(moulin)です。今回はCloudWatch + SQSのバッチサーバ冗長化のために作成した「sqsjkr」についてご紹介します。また、sqsjkrはGo実装です。

github.com

目次

  1. 背景
  2. sqsjkrについて
  3. 運用について

背景

バッチサーバーとは、マシンリソースの消費が高い処理を定期的に実行させたい場合、本体アプリケーションに影響が及ばないようにするために建てられるバッチ処理専用のサーバーを指し、Lobiでも様々なバッチ処理を実行するサーバーを建てています。 バッチサーバーはcrontabが設定されているものが1台、バックアップとして同じ構成でcrontabが設定されていないものが1台起動しており、cronを実行するサーバーに障害が発生した場合は、手動でcrontabをバックアップのサーバーに設定することでフェイルオーバーする運用をしていました。 しかし、バッチサーバーの処理単位は比較的大きいものや、ビジネス的数字の計算もしています。 障害が発生して復旧作業が遅れたりすると非常に困るため、冗長構成が課題となっていました。

今回、crontabの移行が比較的容易になりそうという理由でAWS CloudWatch Events - Schedule1を利用する構成で考えました。 他の案ではrundeckAWS Datapiplineもありましたが、冗長化の設定に手間がかかりそうなのと、crontabからの移行が大変になりそうだったのでやめました。crontabの冗長化も同様の理由で候補から外しました。

  1. AWS CloudWatchのスケジュール実行からSQSにメッセージを送信
  2. SQSのメッセージを取得し、メッセージに定義されているジョブを実行

sqsjkrの冗長化構成

2の「メッセージに定義されたジョブを実行する部分」にあたるのがsqsjkrです。

sqsjkrについて

sqsjkr = SQS Job KickeR の略称名で、先に述べたとおりSQSのメッセージを取得して、その中に定義されたジョブを実行するためのものとなります。 複数台バッチサーバー(AWS EC2の1インスタンス)に各々sqsjkrを起動させておくだけで、冗長化の構成が容易に構築できます。

sqsjkrの概要図

sqsjkrのジョブの定義は次のようなJSONで定義されています。

{
    "command": "perl -Ilib script/cron/aggregate_ad_log_daily.pl 2>&1",
    "env": {
        "PATH": "/usr/local/bin/:/usr/bin/:/sbin/:/bin",
        "MAILTO": "example@example.com"
    },
    "event_id": "aggregate_ad_log_daily",
    "lock_id": "aggregate_ad_log_daily",
    "life_time": "23h",
    "abort_if_locked": false

}
  • commandは実行するコマンド
  • envはcommand実行時に渡す環境変数
  • event_idはcloudwatch eventのevent id
  • lock_idは排他制御で設定するID
  • life_timeは実行待ち状態となったジョブの生存期間(詳しくは後述)
  • abort_if_lockedは既にjobがロックされていた場合に直ぐ諦めてジョブをキャンセルするフラグ

このジョブ定義をcloudwatch eventのinputフィールドに入れます。

重複メッセージと排他制御

重複メッセージ処理

SQSは稀にメッセージが重複して取得されることがあるため、重複したメッセージを取得した際は破棄する必要があります。 sqsjkrでは、メッセージのIDをキーとしてDynamoDBに値を追加(Set)していきます。Setする際に既にキーが存在する場合に重複していた場合はエラー(ConditionalCheckFailedException)が返ってくるため、そのエラーで重複チェックをしています。

また、sqsjkrを開発していた頃はDynamoDBに有効期限(TTL)の設定ができなかったため、SetするときにExpiredの値も含めて、有効期限切れをチェックしています。 Expiredを過ぎているデータを定期的に取得し、それらのデータを削除するクエリを実行しています。

排他制御

ジョブが同時実行されてしまうと不整合が発生してしまう場合、排他制御が必要になります。 sqsjkrでは、ジョブ定義のlock_idで排他制御ができます。 デフォルトでは排他制御でもDynamoDBを利用します。 lock_idをキーにし、重複メッセージチェック時同様にSetのエラーでチェックします。 ジョブの実行が完了したタイミングでDynamoDBからデータを削除します。

今までは、D.J.Bernsteinのsetlockで上記の例のケースに対応していましたが、今回の構成は複数台構成となるためsqsjkr側でsetlockと同様の排他制御機能を作る必要がありました。

重複メッセージと排他制御で使用するバックエンドは実装者が自由に選択できるようThrottlerとLockerというinterfaceにしてあります。 別のものを利用したい場合はThrottlerとLockerを実装して対応できます。

LifeTimeについて

LifeTimeの値はSQSにメッセージが詰め込まれた時間と現在時刻の差と比較され、 その差がLifeTimeよりも大きい場合にジョブがキャンセルされるようになっています。

e.g. (現在時刻 11:00) -  (10:00に詰めた) = 1h > life_time のときキャンセル

LifeTimeを1hに設定していた場合、ジョブを詰めてからメッセージを取り出すのに1時間以上経過していたらキャンセルされます。

LifeTimeの設定は、ジョブが実行される時間帯が決められている場合などに有効な設定です。例えば、毎時間に1時間前の何かの集計をするバッチ処理が予想以上に時間を要して、2時間以上掛かったとします。その場合、次の時間の実行で意図しない時間帯の結果が集計されてしまいます。

LifeTimeが1hに設定されていると、1時間以上待ち状態になったジョブになるためキャンセルされて、間違った結果を集計しないで済みます。また、sqsjkrのlife_time_triggerでLifeTimeが過ぎた契機で実行するコマンドを設定できます。

sqsjkrの使い方

sqsjkrを取得します。

$ curl -O -L https://github.com/kayac/sqsjkr/releases/download/v0.2.1/sqsjkr-v0.2.1-linux-amd64.tar.gz
$ tar xzf sqsjkr-v0.2.1-linux-amd64.tar.gz
$ mv sqsjkr-v0.2.1-linux-amd64 sqsjkr

設定ファイル(config.toml)を用意します。

[account]
profile = "AWSのプロファイル"
id = "awsのアカウントID"
region = "ap-northeast-1"

[sqs]
queue_name = "AWS SQSで作成したqueueの名前"

[kicker]
max_concurrent_num = 5
life_time_trigger = "life_timeを過ぎた時に発火するコマンド"
stats_port = 8061

confを指定して、sqsjkrを起動します。

./sqsjkr -c config.toml

バイナリリリースのsqsjkrでThrottlerとLockerを利用するにはlock-tableオプションでDynamoDBのテーブル名を指定する必要があります。

./sqsjkr -c config.toml -lock-table <DynamoDBのテーブル名>

DynamoDBのテーブル設定:

  • テーブルのフィールド
    • Id(プライマリ): String
    • Type(ソートキー): String
    • Expired: Int
  • インデックス
    • 名前: TypeExpiredIndex
    • タイプ: GSI
    • パーティションキー: Type
    • ソートキー: Expired

別のThrottlerとLockerを実装したい場合はsqsjkrのパッケージを使ってバイナリを作ります。 以下、sqsjkrパッケージを使った場合の簡単な例になります(README.mdにも記載されております)。

package main
import (
    "context"
    "log"

    "github.com/kayac/sqsjkr"
)

func main() {
    // config
    conf := sqsjkr.NewConfig()
    conf.SetAWSAccount("aws_account_id", "aws_profile", "ap-northeast-1")
    conf.SetSQSQueue("your sqs queue name")

    // SetKickerConfig first argument is a concurrency, the second argument
    // is the trigger command when to expire job's life time.
    conf.SetKickerConfig(5, "echo trigger")

    // run sqsjkr
    ctx := context.Background()
    sjkr := sqsjkr.New(conf)
    if err := sqsjkr.Run(ctx, sjkr, "debug"); err != nil {
        log.Println("[error] ", err)
    }
}

運用について

terraformでの管理

Lobiでは、CloudWatch Event - Scheduleの一つ一つのルールをterraformで管理しています。 元々crontabにかかれていたものをperlでparseして、tfファイルに変換しました。

以下、tfのサンプルです。

resource "aws_cloudwatch_event_rule" "sync_host_members" {
  name = "sync_host_members"
  description = <<COMMENT
リカバリー手順: ....
COMMENT
  schedule_expression = "cron(*/10 * * * ? *)"
}

resource "aws_cloudwatch_event_target" "sync_host_members" {
  rule  = "${aws_cloudwatch_event_rule.sync_host_members.name}"
  arn   = "${aws_sqs_queue.cron_queue.arn}"
  input = <<EOF
{
  "command": "$RUNNER -- script/cron/sync_host_members.sh",
  "envs": ${var.cron_envs},
  "event_id": "sync_host_members",
  "lock_id": "sync_host_members",
  "life_time": "10m",
  "abort_if_locked": false
}
EOF
}

Lobiでは、descriptionにスクリプトのリカバリ手順を書くポリシーで運用しています。 sqsjkrのジョブの中身はinputに入れてあるJSONです。

ジョブ失敗の通知

Lobiではcronのコマンド実行失敗を検知するために horenso + consul kv dashboard2 の仕組みが整備されています。失敗時のアクションをsqsjkrが検知する必要はなかったため、その機能は今回実装しませんでした。なので、ここではLobiで実際に失敗検知をしている流れについて軽く触れておこうと思います。

cronの実行結果は、horensoを経由してconsul kv dashboardへ格納していきます。consul kv dashboardでは、dashboardの中を「category」という単位で通知情報をまとめることができ、category単位で通知レベルをリアルタイムで確認できるようになっています(categoryはweb uiのタブになっている部分)。 カテゴリの通知レベルが変わるとtriggerオプションで設定したコマンドを発火させることが出来ます。 そのconsul kv dashboardのtrigger機能を使ってSlackへ通知を流しています。

consul_kv_dashbaordの画面

cronの結果をslackへ通知

通知が飛ぶ流れ:

  • cronが一つ失敗する
  • dashboardのcronカテゴリの通知レベルが変更
  • slackに通知を飛ばすtriggerが発火

バッチ処理全体を停止させてリカバリ

メンテナンスでバッチ処理全体を確実に停止させる必要がある場合は、sqsjkrを止めます。 実行されなかったジョブはSQSに溜まるので、sqsjkrを再開すれば溜まっていたジョブが流れ出すため、手動で実行する必要はありません。 特定の時間帯で実行されるべきジョブがあり、sqsjkrを再起動したときジョブが実行されては困る場合、LifeTimeを適切に設定しておくとキャンセルできます。

バッチ処理全体を停止させる前に、

  1. sqsjkrの停止時間の見積もり
  2. LifeTimeが期限切れでキャンセルされるジョブの洗い出し

さえ出来ていれば、手動で再実行するべきジョブが把握でき、速やかにリカバリを実施することが可能です。

※ また、特定のバッチを停止させたい場合はCloudWatch Eventから止めることができます。

sqsjkrのworker監視

sqsjkrではジョブを実行するworker数(ジョブ同時実行数)を設定するため、workerが現在いくつ実行中なのかを監視して、 worker数が足りなかったりした場合に増やす必要があります。workerの情報は以下のGETで取得できます。

  • GET: /stats/metrics
{
  "busy_worker": integer,
  "idle_worker": integer
}

さいごに

SQSのFIFOがtokyoリージョンに対応したら重複メッセージのチェックが不要3になったり、 DynamoDBのExpireの設定が可能になった4ため簡略化できそうなどの話もあったりと、 今後の宿題も残っていますが、バッチサーバのSPOFを解消するものが無くてようやく作り終えることができて良かったという想いです。

バッチサーバー冗長化とか楽しそう!カヤックのエンジニアと一緒に働きたい!という方、いつでも募集中です!


  1. AWS Cloudwatch Event - Scheduleのルールのスケジュール式の構文

  2. consul kv dashboard は@fujiwaraが実装したconsulのkv storeをWeb UIで閲覧できるようにしてくれる便利なWeb Applicationです(詳細はこちら)。

  3. Amazon Simple Queue Service

  4. Amazon DynamoDB now supports automatic item expiration with Time-to-Live (TTL)