Go サーバーで外部 API 操作の原子性を頑張る

こんにちは、カヤック SRE の市川です。

このエントリは【カヤック】面白法人グループ Advent Calendar 2023の1日目の記事です。

はじめに

いきなり鬼のように私事ですが、結婚することになりました。

技術ブログとは全く関係ない話と見せかけて、婚約者が MySQL 互換な分散 DB の会社で働いていることが、想像していた以上に「データの整合性と向き合う機運」の高まりに繋がりました。

ということで今年は気合を入れて 気合いで DDIA 読破 Advent Calendar 2023 なるものを計画しているのですが、会社のアドベントカレンダーも運営担当をしているので、トップバッターやっていきます。

GopherCon Singapore での学び

個人的な話が連続して恐縮ですが年末なので(?)お許しください。

最近もう一つ大きなことを挙げると、11月の頭に GopherCon Singapore というイベントのために海を渡ってきました。

本当は会場のウォー!!って感じの写真があると良かったのですが、携帯にいい感じの画像がなく、代わりに「現地で仲良くなった GovTech(=デジ庁的な存在)の方々と後日ボルダリングに行った様子」を貼っておきます......

GovTech の方々とボルダリングを楽しんでいる様子

シンガポールのエンジニアも「イベントでもらった無料の Tee シャツ」ばっかり着回していることが、うっすらと感じ取れる素敵な一枚になっているかと思います。

Get the Compiler Involved

閑話休題、GopherCon Singapore は初日のワークショップと2日目のセッションといった2本仕立てとなっていたのですが、軽い気持ちで「せっかくだからワークショップから参加するか〜」と申し込んだら8時間の(当然だけど)完全英語セミナーを受講することになってしまい、大変疲れました。

内容としては、ソフトウェア開発およびコンサルティングや教育を手掛ける ArdanLabs というアメリカ拠点の会社から、たぶん一番偉い人っぽい William Kennedy さんが遥々お越しになり、以下のリポジトリを題材にサーバーサイド アプリケーション開発の包括的な手法・考え方を詰め込んで1日で教える、というものでした。

github.com

流暢な英語で Goroutine の G/M/P みたいな話から Kustomize から何から話題がガンガン進むので、「予備知識のおかげで辛うじて何を言わんとしてるか分かる」レベルの理解度だったのですが、特にアーキテクチャについて話しているときの以下の台詞が頭に残りました。

"Get the Compiler Involved"

これは教材となったリポジトリのアーキテクチャを代弁するような文句で、DDD*1 でいうところの集約を跨いだトランザクションや循環参照を防ぐために Go の言語制約を上手に使う(=コンパイラを関与させる)というものでした。

新しい試み: 原子性との向き合い

ところで自分は今 Google Cloud を用いたモバイルゲームの受託開発に関わっており、現時点で大きな障害のないグローバル サービスに成功*2しています。これについては、サーバー アプリケーションの設計が洗練されていることの恩恵も大きいと考えています。

オンラインtoC事業とデータ破損

モバイルゲームや一般公開された Web サービスなどの「オンラインtoC事業」では、データ破損などの不具合について「ユーザーの不利益が発生するか」を軸に対策・対応を考えることが多いと思います。もちろんこれは重要な考え方です。とはいえ、それに甘えてアプリケーションにおけるデータの整合性設計を妥協すると、暗黙知や潜在的リスクの温床になります。

しかし、データの整合性という問題は、とんでもなく難しいものであることが誰の目にも明らかです。ACID 特性という言葉を引き合いに出すのは簡単ですが、一貫性(Consistency)は誰が責任を負うのか定義するところから始めないといけないし、分離(Isolation)に関しては "Serializable" の定義さえも多岐にわたる世界で、意図的であるか否かに関わらず不完全な実装を指して ACID と銘打っているのがIT業界の実態だと思います。これは業界が悪いのではなく、「扱う問題がとんでもなく難しい」というだけです。

このような難しい問題への対処(=トランザクション処理)をアプリケーション開発者がフルスクラッチで実装するようなことは現実的ではありません。 だからこそ、多くの現場では RDBMS のトランザクション実装に乗っかってデータ破損を防いでいると思います。しかし、外部 API を操作する部分はどうなっているでしょうか?ちょっと目を瞑りたくなるような話題ですが、今回の本題は原子性(Atomicity)だけでも頑張ろうというものです。

もちろん、ACID の各要素は他の要素と互いに不可分*3ですし、現実的にも完全なものは目指せませんが、大事なのは「ないよりはある方がマシ」という精神です。だからこそタイトルにも頑張るという言葉を選んでいます。

DB ドライバに乗っかって、外部 API 操作の原子的操作を目指す

弊プロジェクトでは Cloud Spanner をデータベースに採用しています。Cloud Spanner は NewSQL のパイオニアという立ち位置で間違い無いと思いますが、売り文句と言えば無限スケーリングと強い整合性です。特に Cloud Spanner のトランザクションは Read-Only および Read-Write という二つのモードがサポートされており、後者は読み取る全ての行に対して共有ロックを置きます。

ということで、以降は Spanner を前提に Foo という外部 API にアクセスするような Go のコードを提案します。

なお、当然このままでは動きませんが、転用に関しても At your own risk でお願いします。また、他の DB や言語でも考え方を参考にできると思いますが、異なるものを本質的に別物として取り扱うことは同様に重要です。

トランザクションの開始部分

まず前提として、集約ごとにパッケージが分かれている状態とします。"Get the Compiler Involved" です。各集約のパッケージは ${AGG_PATH} に配置されているとします。

そうしたら、${AGG_PATH}/transaction/ に以下のようなファイルを作ります。ジェネリクスやインタフェースを用いたメタプログラミングを避けているのは意図的です。

transaction.go

package transaction

import (
    "context"
    "errors"

    "super.awesome.client/foo"

    "cloud.google.com/go/spanner"
)

type Txn struct {
    spanner *spanner.Client
}

func New(db *spanner.Client) *Txn {
    return &Txn{spanner: db}
}

// Execute は単純に Spanner のトランザクションを実行します。
func (t *Txn) Execute(ctx context.Context, fn func(context.Context, *DB) error) error {
    _, err := t.spanner.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
        return fn(ctx, newDB(txn))
    })
    return err
}

type TxnWithFoo struct {
    base *Txn
    foo  *foo.Client
}

func NewWithFoo(db *spanner.Client, foo  *foo.Client) *TxnWithFoo {
    return &TxnWithFoo{
        base: New(db),
        foo:  foo,
    }
}

// Execute は基盤となる Spanner のトランザクションを実行したのち、エラーが発生した場合のみ Foo の補償処理を実行します。
func (t *TxnWithFoo) Execute(ctx context.Context, fn func(context.Context, *DB, *AtomicFoo) error) error {
    compensator := &Compensator{}
    err := t.base.Execute(ctx, func(ctx context.Context, db *DB) error {
        return fn(ctx, db, NewAtomicFoo(compensator, t.foo))
    })
    if err != nil {
        // NOTE: ここの扱いは検討の余地あり
        return errors.Join(compensator.Do(), err)
    }
    return nil
}

言ってしまえばラッパーです。重要な部分を抜粋するとココですね。

// Execute は基盤となる Spanner のトランザクションを実行したのち、エラーが発生した場合のみ Foo の補償処理を実行します。
func (t *TxnWithFoo) Execute(ctx context.Context, fn func(context.Context, *DB, *AtomicFoo) error) error {
    compensator := &Compensator{}
    err := t.base.Execute(ctx, func(ctx context.Context, db *DB) error {
        return fn(ctx, db, NewAtomicFoo(compensator, t.foo))
    })
    if err != nil {
        // NOTE: ここの扱いは検討の余地あり
        return errors.Join(compensator.Do(), err)
    }
    return nil
}

CompensatorAtomicFoo が気になったと思います。

補償処理を実行する Compensator

Compensator という名前は SAGAパターン の補償トランザクションという概念に着想しています。

type Compensator struct {
    mu    sync.Mutex
    stack []func(ctx context.Context) error
}

上記の構造体に Register(rollback func(ctx context.Context) error) および Do() error を生やしています。

以下フル実装です。トランザクション開始処理と同じく ${AGG_PATH}/transaction/ 配下に置いています。先ほどのコードにもあったとおり、compensator.Do の扱いには検討の余地があると考えています。

compensator.go

package transaction

import (
    "context"
    "sync"
)

// Compensator は外部システムへのアクセスを念頭に、Atomic な処理を実現するためのロールバック機構です。
type Compensator struct {
    mu    sync.Mutex
    stack []func(ctx context.Context) error
}

// Register はロールバック処理を登録します。
func (c *Compensator) Register(rollback func(ctx context.Context) error) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.stack = append(c.stack, rollback)
}

// Do はロールバックを登録と逆順に実行します。
func (c *Compensator) Do() error {
    c.mu.Lock()
    defer c.mu.Unlock()
    for i := len(c.stack) - 1; i >= 0; i-- {
        if err := c.stack[i](context.Background()); err != nil {
            return err
        }
    }
    return nil
}

原子的な操作を提供する AtomicFoo

そして本命の AtomicFoo ですが、概要としては以下のような構造体に DoSomething() という操作メソッドを実装するとしたら、そのメソッド内にて f.compensator.Register(...) を呼び出して巻き戻し操作のクロージャを与えるイメージです。

// AtomicFoo は外部 API を(擬似的に)アトミックに操作する方法を提供します。
type AtomicFoo struct {
    compensator *Compensator
    client      *foo.Client
}

操作としてはジョブの作成と開始メソッドを定義しており、それぞれに対して巻き戻し操作を登録しています。

  • CreateJob(ctx context.Context, manifest model.Manifest) (model.FooJob, error)
  • StartJob(ctx context.Context, jobID model.JobID) error

以下フル実装です。同様に ${AGG_PATH}/transaction/ に配置しています。 model パッケージの実装および外部 API の操作はアプリの要件と外部 API の仕様によって様変わりするので、f.compensator.Register(...) 呼び出しの雰囲気だけ掴んでいただければ大丈夫です。

foo.go

package transaction

import (
    "context"
    "errors"

    "super.awesome.client/foo"

    "${AGG_PATH}/model"
)

// AtomicFoo は外部 API を(擬似的に)アトミックに操作する方法を提供します。
type AtomicFoo struct {
    compensator *Compensator
    client      *foo.Client
}

func NewAtomicFoo(compensator *Compensator, client *foo.Client) *AtomicFoo {
    return &AtomicFoo{compensator: compensator, client: client}
}

// CreateJob は指定されたマニフェストに沿ってジョブを作成します。
func (f *AtomicFoo) CreateJob(ctx context.Context, manifest model.Manifest) (model.FooJob, error) {
    job, err := f.client.CreateJob(ctx, createJobArgFromModel(manifest))
    if err != nil {
        return errors.Wrap(err, "error  f.client.CreateJob")
    }
    // これ以降でエラーが発生した場合、ジョブは削除される!!
    defer f.compensator.Register(func(ctx context.Context) error {
        if err := f.client.DeleteJob(ctx, id); err != nil {
            return errors.Wrap(err, "error f.client.DeleteJob")
        }
        return nil
    })
    return jobToModel(job), nil
}

// StartJob は指定された jobID のジョブを開始します。
func (f *AtomicFoo) StartJob(ctx context.Context, jobID model.JobID) error {
    operation, err := f.client.StartJob(ctx, jobID); err != nil {
        return errors.Wrap(err, "error f.client.StartJob")
    }
    // これ以降でエラーが発生した場合、オペレーションの実行が停止される
    defer f.compensator.Register(func(ctx context.Context) error {
        if err := operation.Stop(ctx); err != nil {
            return errors.Wrap(err, "error operation.Stop")
        }
    })
    return nil
}

ここで注意ですが、Create に対する Delete は真逆の操作と言えそうですが、Start に対する Stop は「原子性」を語る上では何歩も妥協した対応関係にしかありません。もちろん Stop ではなく Cancel という命名であったとしても、外部システムにおける作用を完全に巻き戻す効果はないかもしれません。

しかし、大切なのは「ないよりはある方がマシ」という精神で、もっといえば、適当に目を瞑らないでちゃんと向き合うことです。

ユースケースはどうなるか

ということで、「トランザクションはユースケースで開始したいけど集約を跨ぐのは防ぎたい」という要望と同時に、「トランザクションに失敗した時に外部 API 操作もできるなら巻き戻したい」を叶えたユースケースのイメージが以下になります。ユースケースは、当然ながら transaction パッケージとは異なる場所にあります。おそらく、 ${AGG_PATH} の外側に配置されているはずです。

// ~ 略 ~
    txn := transaction.NewWithFoo(spn, fooClient)
    if err := txn.Execute(ctx, func(ctx context.Context, db *transaction.DB, foo *transaction.AtomicFoo) error {
        // DB 読み取り
        sth, err := db.FindSomething(ctx, id)
        if err != nil { ... }
        // 外部 API 操作
        job, err := foo.CreateJob(ctx, sth.Manifest())
        if err != nil { ... }
        if err := foo.StartJob(ctx,  job.ID); err != nil { ... }
        // DB 書き込み
        stg, err := db.UpdateSomething(ctx, sth.WithJob(job))
        if err != nil { ... }
        return nil
    }); err != nil { ... }
// ~ 略 ~

これは一つの集約しか扱わない前提ですが、例えば他の集約にもアクセスしようと思うと、import した別集約の transaction パッケージに別名をつける=使い分ける必要に迫られます。

また、DB にしか触らないユースケースでは以下のようにシンプルになります。このように、Execute に対して渡すシグニチャが変わるので「どのデータを操作するのか」が明確になります。

// ~ 略 ~
    txn := transaction.New(spn)
    if err := txn.Execute(ctx, func(ctx context.Context, db *transaction.DB) error {
    // ~ 略 ~
    }); err != nil { ... }
// ~ 略 ~

ここまで来てテストやバリデーションの戦略などが気になった人もいるかと思いますが、それらについて語るには余白が足りないので今回はここら辺にしようと思います。

まとめ

サーバー アプリケーションの実装・運用において、データの整合性と向き合うことは決して簡単ではありません。 そんな中で、以下を意識することが外部 API と仲良くするための一つのヒントになるのではないでしょうか。

  1. "Get the Compiler Involved" でデータの扱いを明確にしよう
  2. 外部 API にも目を瞑らず、ACID の A だけでも頑張ろう
  3. DB のトランザクション実装に乗っかろう

カヤックではプログラムを実際に書いて議論するのが好きなエンジニアも募集しています!

hubspot.kayac.com

*1:ワークショップ内での "DDD" 関連語彙の使い分けは非常に曖昧なものだったので、必要に応じて言い換えています。

*2:たとえばユーザーの同期リクエストを受け付ける API の可用性は 99.9% 以上を維持しています。

*3:あくまで筆者の考えです。単に勉強不足なだけかもしれません。