kuiperbeltでWebSocketから卒業してみてSocket.IOと共に歩んでみる

ソーシャルゲーム事業部の谷脇、またの名をid:mackee_wです。みなさんいかがお過ごしですか。

私は現在、東京プリズンというゲームアプリのサーバサイドを担当しています。東京プリズンはリアルタイムのボードゲーム的な殴り合いゲームなのですが、当然リアルタイム通信技術が使われています。

東京プリズンではリアルタイム通信に拙作のkuiperbeltというサーバミドルウェアが使われています。このミドルウェアは主にWebSocketの接続を管理します。kuiperbeltを使うと普通のHTTPしかしゃべることが出来ない言語・フレームワークであってもスケールするWebSocketアプリケーションを構築することができます。

……という話は今年のCEDEC 2018で行ったので、こちらのスライドを見てください。

speakerdeck.com

さて、このkuiperbeltの発表から現在までに実装した機能を紹介します。

実装された機能

Receive Callback

上記の発表のときには「kuiperbeltは上り(クライアント->サーバ)方向の通信には対応していません。普通のREST APIとかを実装してお使いください」と言っていましたが、「ニーズは感じるので実装する予定はある」という話もしていました。なので実装したという話です。

f:id:mackee_w:20181217205700p:plain
上り非対応時のアーキテクチャ

上記スライドにもこういう図がでてくるのですが、

  • 下り(サーバ -> クライアント)方向: Web Application --(HTTP/1.x)--> kuiperbelt --(WebSocket)--> client
  • 上り(クライアント -> サーバ)方向: client --(HTTP/1.x)--> Web Application

というように、作者の気持ちとしては「上りは別にRESTで代用できるから良いでしょ」という感じでした。また、kuiperbeltが間に挟まることで構成が複雑になります。構成が複雑になるということは障害点が増えますし、リアルタイム通信において重要な要素であるレイテンシも増加します。

しかし思いの外WebSocket一本でやったほうが良いという声が聞かれました。2種類のコネクションを扱わないといけないクライアントとしても、実装が煩雑になるというデメリットが有りました。というわけで実装しました。

使い方は簡単で、バイナリ版を使っている方は、お手持ちのconfig.ymlに、

callback:
  receive: http://example.com/callback/receive

と言ったふうに、上り方向の通信が発生した際に、メッセージを受け流すURLを設定するだけでOKです。

DockerHub版をお使いの方は、環境変数EKBO_RECEIVE_CALLBACK_URLに上記と同様のURLをセットしてください。

これら2つに対して何も行わなければ従来どおりの動作、つまり上り方向のメッセージはすべて捨てるという動作になります。

また、receive callbackのX-Kuiperbelt-Endpointは、connect callbackのときと同様、メッセージ発生元のクライアントがつながっているkuiperbeltの解決可能なホスト名が入ります。つまり、KVSなどに保存した接続情報を参照せずにメッセージを打ち返すことが可能です。と、これを書いているときに思いついたのですが、receive callbackのレスポンスでメッセージを打ち返すのもできるなと思ったのでやります。

是非使って感想をお聞かせください。

Rename Endpoint Path

もともとkuiperbeltのAPIの名前の付け方はおかしくて、APIに/connect/closeが対として存在します。社内からも/disconnect(または/open)じゃないんかい!というツッコミも頂いていますが、すでに使われている後なので変えようがない、v2.0.0でも出したときに変えるかという気分でした。

それとは別に、使っているWebSocketクライアントの制限などでパスを自由に決めることが出来ないというケースも存在します。また、本体のアプリケーションとロードバランサーで外から見えるホスト名を一緒にした場合に、パス名がかぶってイヤンというケースもあるでしょう。というわけで、設定ファイルからこのAPIのパス名を変えられるようにしようという試みです。

変えるにはお手元のconfig.ymlに、

path:
  connect: "/my_connect"
  close: "/my_disconnect"
  stats: "/my_stats"
  send: "/my_send"
  ping: "/my_ping"

というように、設定するだけでOKです。DockerHub版をお使いの方は、それぞれの設定を環境変数EKBO_CONNECT_PATH EKBO_CLOSE_PATH EKBO_STATS_PATH EKBO_SEND_PATH EKBO_PING_PATHにセットすれば良いです。

何も設定しない場合は今までどおりの(ツッコミされる)パスになります。

実装された機能を使って、Socket.IOをしゃべる

上りやパス名前変更が実装されたkuiperbeltを使って何を作りましょうか。ワクワクしますね。そんなわけで、Socket.IOに対応させる試みをやってみようと思います。

なぜSocket.IOを使いたいか

私はkuiperbeltの機能デモとしてJavaScriptを用いますが、JavaScriptの場合、ブラウザJSにはWebSocket APIが搭載されているため、これを用いることになります。その際に困る点として、

  • 再接続を自前でしないといけない
    • Exponential Backoffなども考えて実装すると、言語も慣れてないので、かなりの確率でバグる
  • シリアライズ・デシリアライズは自前
  • ルーム機能がない

などがあります。特に再接続は大変で、以前CEDECで往復の通信レイテンシを測るアプリケーションを作ったときには、比較のためにWiFiと4Gの切り替えを行うとリトライしそこねて更にリトライしたコネクションが分裂してグラフが大変なことになってしまいました。これは実装力が足りないという話ではあるんですが、再接続ぐらいは何らかのライブラリに頼りたいところです。

そもそもSocket.IOが生のWebSocket APIに対して優位な点は、

  • XHR, long polling, WebSocketと環境に応じて通信路を変えてくれる
  • 再接続をしてくれる
  • JSONでシリアライズ/デシリアライズをしてくれる
  • ルームやネームスペース機能がある

などがあります。チャットルームなどを作るのであればSocket.IOを使うのが良い選択であると思われます。

ところで、通信路を選んでくれる機能ですが、WebSocketが実装されたWebブラウザがまばらで、Upgradeヘッダや101 Switching Protocolsを正しく解釈してくれないフォワードプロキシなどが存在するインターネットであれば大変有用な機能でした。しかし、昨今はフルhttps化によって、途中の経路はhttpsで包まれたWebSocketはただのTCP通信にしか見えません。つまりWebSocketが疎通しない環境というのはだいぶ減ってきています。そこでこの記事では、Socket.IOはWebSocketだけを使う設定にして使おうと思います。

つまりこういうことです。

const socket = io(uri, {
    transports: ["websocket"],
});

With websocket transport only - Socket.IOを参考にしました。

Engine.IOプロトコルとSocket.IOプロトコル

Socket.IOアプリケーションのサーバサイドはほとんどがNode.jsで構築されます。Alternativeな実装として多言語への移植もあります。私が普段扱う言語であるPerl5であればPocketIOというモジュールがあります。

一方これからやろうとしているのは、WebSocketの接続管理のみを担当するkuiperbelt越しでSocket.IOをしゃべることです。先に挙げたサーバ実装たちは当然WebSocketを自分が直接しゃべるのを前提としています。

つまりSocket.IOプロトコルを実装することになります。Node.jsであればsocket.io-parserを直接使うのでも良いでしょう。PocketIOもみたところパーサ実装部分だけ切り離して使えそうな雰囲気を感じます。ただ、ここは後学とチャレンジのために、プロトコルを自前実装してみることを考えます。

そもそもSocket.IOプロトコルはEngine.IOというプロトコル/クライアントの上に構築されています。Engine.IOは上記で話した「任意の通信路を選んで同じインターフェイスを実現する」という部分を実装しているクライアントです。そして、Engine.IOにもプロトコルがあり、Socket.IOにもプロトコルがあります。ただ、WebSocketだけを扱う場合においてはEngine.IOプロトコルの部分は対して複雑ではありません。また、Engine.IOのメッセージの中のSocket.IOプロトコルであれば、WebSocketの場合だろうがなんだろうが関係がありません。

以上の階層は以下の図で表すことが出来ます。

f:id:mackee_w:20181218103342p:plain

今回は、WebSocketの部分をkuiperbeltで実現します。kuiperbeltはWebSocketとHTTP/1.1の相互変換プロキシをという言い方もできます。WebSocketのメッセージをフレームごとにHTTPリクエストで分解し、WebアプリケーションのAPIへ投げる事ができます。また、kuiperbeltのAPIを叩くことで特定のWebSocket接続に対してメッセージを送信することが出来ます。

つまり、Webアプリケーション側でEngine.IOおよびSocket.IOのエンコーダー/デコーダーを実装すれば、Socket.IOでの通信が出来ます。

Engine.IOプロトコル及びSocket.IOプロトコルの概要は以下にあります。

GitHub - socketio/engine.io-protocol

GitHub - socketio/socket.io-protocol: Socket.IO 1.0 Protocol specification and parser component / node.js module.

Socket.IOの通信のやり取りの観察

まず実装する前に、本物のSocket.IOを使ったアプリケーションを実装してみて、サーバとクライアント間に簡易的なWebSocketプロキシを挟んでどういうふうにやり取りをしているかを観察してみます。

この観察に使ったコードは以下です。このコードはkuiperbeltと同様にGoで実装し、WebSocketの通信もkuiperbeltと同じくgorilla/websocketを用いました。

Poor Socket.IO Proxy for debug · GitHub

このプログラムが動作している様子はこちらです。

f:id:mackee_w:20181218105051p:plain

Engine.IO及びSocket.IOの接続開始手続き及びその後のPING/PONGのやり取りが行われています。ちなみに、WebSocket自体にもPING/PONGフレームという専用のものがあるのですが、Engine.IOではWebSocketを用いるときにテキストフレーム上にEngine.IOのPING/PONGタイプのメッセージを流して同様のことを行っているようです。

さて、以上の観察から、及び先に挙げたプロトコルの仕様を解説したページから、Socket.IOの通信は以下のような形式で行われていることが分かります。

f:id:mackee_w:20181218102443p:plain

これらは4とか2は16進数での表現ではなく、文字の4です。ここまで分かればあとは実装するだけですね。

webchat-sampleで実装してみる

githubのkuiperbelt organizationにはサンプルアプリとして、kuiperbeltを使った簡単なチャットシステムの参考実装があります。

github.com

現在WebSocket APIを使っているのを、代わりにSocket.IOを使うようにしてみます。ちなみにkuiperbeltの裏側のWebアプリケーションはPerlで書かれています。

やることとしては、

  • クライアントサイドでnew WebSocketしているところをio()に置き換える
  • socket.io自体はCDN版を使用する
  • サーバサイドでconnect callback(WebSocket通信開始時に認証目的でkuiperbeltがWebアプリケーション側に投げるコールバック)を受けたときにEngine.IOのopenパケットを投げる
  • サーバサイドでreceive callbackを実装する
    • PINGパケットが来たらPONGパケットを返す
    • 42(Engine.IOがmessageでSocket.IOがevent)パケットが来たらイベントが来たと思って、Redisから現在つながっているクライアント情報を探してメッセージをブロードキャストする

です。

receiveは以下のようにsubstrで実装したおもちゃみたいなエンコーダ/デコーダですが、動いています。

    my $endpoint = $req->header("X-Kuiperbelt-Endpoint");
    my $session  = $req->header("X-Kuiperbelt-Session");
    my $msg      = $req->content;

    my $engineio_packet_type = substr($msg, 0, 1);
    if ($engineio_packet_type eq ENGINEIO_PACKET_TYPE->{ping}) {
        send_message($endpoint, ENGINEIO_PACKET_TYPE->{pong}, $session);
    }
    elsif ($engineio_packet_type eq ENGINEIO_PACKET_TYPE->{message}) {
        my $socketio_packet_type = substr($msg, 1, 1);
        if ($socketio_packet_type eq SOCKETIO_PACKET_TYPE->{event}) {
            my $message = substr($msg, 2);
            my $event = decode_json($message);
            if (ref $event eq "ARRAY" && scalar(@$event) == 2 && $event->[0] eq "message") {
                my $escaped_message = escape_html($event->[1]);
                my $engineio_message =
                    ENGINEIO_PACKET_TYPE->{message} .
                    SOCKETIO_PACKET_TYPE->{event} .
                    encode_json(["message", $escaped_message]);
                broadcast_message($engineio_message);
                $redis->rpush("messages", $escaped_message);
            }
            else {
                warn sprintf("unexpected event message structure: %s", $msg);
            }
        }
        elsif ($socketio_packet_type eq SOCKETIO_PACKET_TYPE->{connect}) {
            # nop
        }
        else {
            warn sprintf("not support Socket.IO message type: %s", $msg);
        }
    }
    else {
        warn sprintf("not support Engine.IO message type: %s", $msg);
    }

ハマった点

Engine.IOのopenを投げた後にSocket.IOのconnectを投げないといけない

Engine.IOの接続開始時のopenメッセージは以下のように組み立てています。

my $resp = ENGINEIO_PACKET_TYPE->{open} . encode_json({
    sid => $session,
    upgrades => [],
    pingInterval => PING_INTERVAL,
    pingTimeout  => PING_TIMEOUT,
});

内容から分かる通り、openパケットを表す識別子を付けた後にJSONでPING間隔やタイムアウトを返しています。

これで接続確立するだろうと思ったらうまくいきませんでした。Engine.IOのPING/PONGはやりとりしているものの、クライアント側のon("connect")だったり、on("message")が発火できませんし、emitも効きません。どうやらSocket.IOの部分では接続がまだできていない状態のようです。

クライアント側の書き方が悪いのかとデバッグし続けたのですが、なかなか原因がわかりません。そこで何か見落としはないかと本物のSocket.IO同士のやり取りを観察していると、Engine.IOのopenを投げた後に、さらにサーバからクライアントへSocket.IOconnectが飛んでいることが分かりました。そこで初めてきたPINGの応答を返す前にconnectを返すと接続確立ができその後チャットもできるようになりました。

しかしまたここで問題です。kuiperbeltのconnect callbackのレスポンスはWebSocket通信確立後に初めてサーバからクライアントへ送るメッセージとなっていて、このSocket.IOのアプリではEngine.IOのopenを送っています。つまりこの前にkuiperbeltの/sendAPIを使ってメッセージを送ろうとしても到達しません。connect callbackのレスポンスを返した後にSocket.IOのconnectを送らないといけません。

Goであればgoroutineを起動させてちょっとsleepさせたあとに投げれば良いのですが、PerlのPreforkアーキテクチャであれば外部に何らかの遅延ジョブキューなどを置いて対処するという方法があります。実際に東京プリズンではSQSを用いて同様のことを行っています。ですが、サンプルアプリにジョブキューを入れるのは少し大袈裟すぎます。「初めてPINGが飛んできたときにconnectを送る」というのも試してみてうまく動きましたが、チャットが機能するようになるまで時間がかかりました。

というわけで、SIGALRMを使ったシグナル割り込みを使ってお茶を濁しました。

    $redis->rpush("kuiperbelt_socketiostarting", $session);

    $SIG{ALRM} = sub {
        while (my $session = $redis->lpop("kuiperbelt_socketiostarting")) {
            send_message($endpoint, ENGINEIO_PACKET_TYPE->{message} . SOCKETIO_PACKET_TYPE->{connect}, $session);
            $redis->set("kuiperbelt_socketiostarting:$session", 1);
        }
    };
    alarm 2;

2秒という数字にあまり根拠はないです。2秒の間に複数のリクエストが来たときも想定して、Redis Listに突っ込んでますが、複数処理したときにどうなるかまではちょっと検討してません。他に何かいい方法があれば教えてください。

io(uri)するときにパスが付いているとそれがネームスペースになる

kuiperbeltはもともとhttps://kuiperbe.example.com/connectのようなURLがエンドポイントで、/connectは固定でした。今回いじってるチャットのサンプルアプリはまずエンドポイントAPIなるもを叩いて、そこからWebSocketのつなぎ先を教えてもらう方式だったのですが、/connectと入っているURLをio関数に渡すと/connectというネームスペースに接続するという動作になるようです。それを無視してネームスペース指定なしで送ると何も起こらない状態になったため、接続先URLから/connectを取り除く対処が必要でした。

接続先URLは末尾に必ず/が付いていないといけない

上記に関連するのですが、kuiperbeltはもともと/connectでWebSocketのリクエストを待ち受けているわけですが、Socket.IOのデフォルトは/socket.io/です。ただSocket.IO側は変えることが出来ます。

const socket = io(uri, { path: "/connect" });

これでイケる!と思いましたがなかなかうまく行きません。サーバ側で404を返しているログを見ると/connect/?...となっています。パスとquery stringの間にスラッシュが付いているのでこれ外せないかとSocket.IOのドキュメントやコードを読んだのでしたが、絶対にスラッシュつけるマンという様子でした。

engine.io-client/socket.js at 696c7e7efc9c8d388022149c3e5074e77284badb · socketio/engine.io-client · GitHub

そこで、kuiperbelt側が折れることになります。冒頭の新機能紹介にあったパス変更機能に至るわけです。やはり実際に使ってみないと必要な機能というのは分からないものですね。

この後の展望

前回長田さんが私の記事を予告するときに用いた文言を見ますと「kuiperbeltの新機能 gRPC Adapterを使ったSocket.IO対応」となっています。つまり今回はPerlのWebアプリケーションが直接Socket.IOをしゃべるという内容だったのですが、本来、そして今どきのアプリケーション構成として正しいのは

  1. kuiperbeltがHTTPのコールバック以外にgRPCをサポートする
  2. 同居するコンテナアプリケーション(sidecar)として、Socket.IOをしゃべるデーモンを作る
  3. 2のデーモンがルーム管理を行ったり、APIで操作できる

というものです。来年は未来を目指していきます。

明日は遠藤さんで、「AI Chat Botの話」です。お楽しみに!