ソーシャルゲーム事業部の谷脇、またの名をid:mackee_wです。みなさんいかがお過ごしですか。
私は現在、東京プリズンというゲームアプリのサーバサイドを担当しています。東京プリズンはリアルタイムのボードゲーム的な殴り合いゲームなのですが、当然リアルタイム通信技術が使われています。
東京プリズンではリアルタイム通信に拙作のkuiperbeltというサーバミドルウェアが使われています。このミドルウェアは主にWebSocketの接続を管理します。kuiperbeltを使うと普通のHTTPしかしゃべることが出来ない言語・フレームワークであってもスケールするWebSocketアプリケーションを構築することができます。
……という話は今年のCEDEC 2018で行ったので、こちらのスライドを見てください。
さて、このkuiperbeltの発表から現在までに実装した機能を紹介します。
実装された機能
Receive Callback
上記の発表のときには「kuiperbeltは上り(クライアント->サーバ)方向の通信には対応していません。普通のREST APIとかを実装してお使いください」と言っていましたが、「ニーズは感じるので実装する予定はある」という話もしていました。なので実装したという話です。
上記スライドにもこういう図がでてくるのですが、
- 下り(サーバ -> クライアント)方向: Web Application --(HTTP/1.x)--> kuiperbelt --(WebSocket)--> client
- 上り(クライアント -> サーバ)方向: client --(HTTP/1.x)--> Web Application
というように、作者の気持ちとしては「上りは別にRESTで代用できるから良いでしょ」という感じでした。また、kuiperbeltが間に挟まることで構成が複雑になります。構成が複雑になるということは障害点が増えますし、リアルタイム通信において重要な要素であるレイテンシも増加します。
しかし思いの外WebSocket一本でやったほうが良いという声が聞かれました。2種類のコネクションを扱わないといけないクライアントとしても、実装が煩雑になるというデメリットが有りました。というわけで実装しました。
使い方は簡単で、バイナリ版を使っている方は、お手持ちのconfig.yml
に、
callback: receive: http://example.com/callback/receive
と言ったふうに、上り方向の通信が発生した際に、メッセージを受け流すURLを設定するだけでOKです。
DockerHub版をお使いの方は、環境変数EKBO_RECEIVE_CALLBACK_URL
に上記と同様のURLをセットしてください。
これら2つに対して何も行わなければ従来どおりの動作、つまり上り方向のメッセージはすべて捨てるという動作になります。
また、receive callbackのX-Kuiperbelt-Endpoint
は、connect callbackのときと同様、メッセージ発生元のクライアントがつながっているkuiperbeltの解決可能なホスト名が入ります。つまり、KVSなどに保存した接続情報を参照せずにメッセージを打ち返すことが可能です。と、これを書いているときに思いついたのですが、receive callbackのレスポンスでメッセージを打ち返すのもできるなと思ったのでやります。
是非使って感想をお聞かせください。
Rename Endpoint Path
もともとkuiperbeltのAPIの名前の付け方はおかしくて、APIに/connect
と/close
が対として存在します。社内からも/disconnect
(または/open
)じゃないんかい!というツッコミも頂いていますが、すでに使われている後なので変えようがない、v2.0.0でも出したときに変えるかという気分でした。
それとは別に、使っているWebSocketクライアントの制限などでパスを自由に決めることが出来ないというケースも存在します。また、本体のアプリケーションとロードバランサーで外から見えるホスト名を一緒にした場合に、パス名がかぶってイヤンというケースもあるでしょう。というわけで、設定ファイルからこのAPIのパス名を変えられるようにしようという試みです。
変えるにはお手元のconfig.yml
に、
path: connect: "/my_connect" close: "/my_disconnect" stats: "/my_stats" send: "/my_send" ping: "/my_ping"
というように、設定するだけでOKです。DockerHub版をお使いの方は、それぞれの設定を環境変数EKBO_CONNECT_PATH
EKBO_CLOSE_PATH
EKBO_STATS_PATH
EKBO_SEND_PATH
EKBO_PING_PATH
にセットすれば良いです。
何も設定しない場合は今までどおりの(ツッコミされる)パスになります。
実装された機能を使って、Socket.IOをしゃべる
上りやパス名前変更が実装されたkuiperbeltを使って何を作りましょうか。ワクワクしますね。そんなわけで、Socket.IOに対応させる試みをやってみようと思います。
なぜSocket.IOを使いたいか
私はkuiperbeltの機能デモとしてJavaScriptを用いますが、JavaScriptの場合、ブラウザJSにはWebSocket APIが搭載されているため、これを用いることになります。その際に困る点として、
- 再接続を自前でしないといけない
- Exponential Backoffなども考えて実装すると、言語も慣れてないので、かなりの確率でバグる
- シリアライズ・デシリアライズは自前
- ルーム機能がない
などがあります。特に再接続は大変で、以前CEDECで往復の通信レイテンシを測るアプリケーションを作ったときには、比較のためにWiFiと4Gの切り替えを行うとリトライしそこねて更にリトライしたコネクションが分裂してグラフが大変なことになってしまいました。これは実装力が足りないという話ではあるんですが、再接続ぐらいは何らかのライブラリに頼りたいところです。
そもそもSocket.IOが生のWebSocket APIに対して優位な点は、
- XHR, long polling, WebSocketと環境に応じて通信路を変えてくれる
- 再接続をしてくれる
- JSONでシリアライズ/デシリアライズをしてくれる
- ルームやネームスペース機能がある
などがあります。チャットルームなどを作るのであればSocket.IOを使うのが良い選択であると思われます。
ところで、通信路を選んでくれる機能ですが、WebSocketが実装されたWebブラウザがまばらで、Upgradeヘッダや101 Switching Protocolsを正しく解釈してくれないフォワードプロキシなどが存在するインターネットであれば大変有用な機能でした。しかし、昨今はフルhttps化によって、途中の経路はhttpsで包まれたWebSocketはただのTCP通信にしか見えません。つまりWebSocketが疎通しない環境というのはだいぶ減ってきています。そこでこの記事では、Socket.IOはWebSocketだけを使う設定にして使おうと思います。
つまりこういうことです。
const socket = io(uri, { transports: ["websocket"], });
With websocket transport only - Socket.IOを参考にしました。
Engine.IOプロトコルとSocket.IOプロトコル
Socket.IOアプリケーションのサーバサイドはほとんどがNode.jsで構築されます。Alternativeな実装として多言語への移植もあります。私が普段扱う言語であるPerl5であればPocketIOというモジュールがあります。
一方これからやろうとしているのは、WebSocketの接続管理のみを担当するkuiperbelt越しでSocket.IOをしゃべることです。先に挙げたサーバ実装たちは当然WebSocketを自分が直接しゃべるのを前提としています。
つまりSocket.IOプロトコルを実装することになります。Node.jsであればsocket.io-parser
を直接使うのでも良いでしょう。PocketIOもみたところパーサ実装部分だけ切り離して使えそうな雰囲気を感じます。ただ、ここは後学とチャレンジのために、プロトコルを自前実装してみることを考えます。
そもそもSocket.IOプロトコルはEngine.IOというプロトコル/クライアントの上に構築されています。Engine.IOは上記で話した「任意の通信路を選んで同じインターフェイスを実現する」という部分を実装しているクライアントです。そして、Engine.IOにもプロトコルがあり、Socket.IOにもプロトコルがあります。ただ、WebSocketだけを扱う場合においてはEngine.IOプロトコルの部分は対して複雑ではありません。また、Engine.IOのメッセージの中のSocket.IOプロトコルであれば、WebSocketの場合だろうがなんだろうが関係がありません。
以上の階層は以下の図で表すことが出来ます。
今回は、WebSocketの部分をkuiperbeltで実現します。kuiperbeltはWebSocketとHTTP/1.1の相互変換プロキシをという言い方もできます。WebSocketのメッセージをフレームごとにHTTPリクエストで分解し、WebアプリケーションのAPIへ投げる事ができます。また、kuiperbeltのAPIを叩くことで特定のWebSocket接続に対してメッセージを送信することが出来ます。
つまり、Webアプリケーション側でEngine.IOおよびSocket.IOのエンコーダー/デコーダーを実装すれば、Socket.IOでの通信が出来ます。
Engine.IOプロトコル及びSocket.IOプロトコルの概要は以下にあります。
GitHub - socketio/engine.io-protocol
Socket.IOの通信のやり取りの観察
まず実装する前に、本物のSocket.IOを使ったアプリケーションを実装してみて、サーバとクライアント間に簡易的なWebSocketプロキシを挟んでどういうふうにやり取りをしているかを観察してみます。
この観察に使ったコードは以下です。このコードはkuiperbeltと同様にGoで実装し、WebSocketの通信もkuiperbeltと同じくgorilla/websocketを用いました。
Poor Socket.IO Proxy for debug · GitHub
このプログラムが動作している様子はこちらです。
Engine.IO及びSocket.IOの接続開始手続き及びその後のPING/PONGのやり取りが行われています。ちなみに、WebSocket自体にもPING/PONGフレームという専用のものがあるのですが、Engine.IOではWebSocketを用いるときにテキストフレーム上にEngine.IOのPING/PONGタイプのメッセージを流して同様のことを行っているようです。
さて、以上の観察から、及び先に挙げたプロトコルの仕様を解説したページから、Socket.IOの通信は以下のような形式で行われていることが分かります。
これらは4
とか2
は16進数での表現ではなく、文字の4
です。ここまで分かればあとは実装するだけですね。
webchat-sampleで実装してみる
githubのkuiperbelt organizationにはサンプルアプリとして、kuiperbeltを使った簡単なチャットシステムの参考実装があります。
現在WebSocket APIを使っているのを、代わりにSocket.IOを使うようにしてみます。ちなみにkuiperbeltの裏側のWebアプリケーションはPerlで書かれています。
やることとしては、
- クライアントサイドで
new WebSocket
しているところをio()
に置き換える - socket.io自体はCDN版を使用する
- サーバサイドでconnect callback(WebSocket通信開始時に認証目的でkuiperbeltがWebアプリケーション側に投げるコールバック)を受けたときにEngine.IOのopenパケットを投げる
- サーバサイドでreceive callbackを実装する
- PINGパケットが来たらPONGパケットを返す
42
(Engine.IOがmessageでSocket.IOがevent)パケットが来たらイベントが来たと思って、Redisから現在つながっているクライアント情報を探してメッセージをブロードキャストする
です。
receiveは以下のようにsubstr
で実装したおもちゃみたいなエンコーダ/デコーダですが、動いています。
my $endpoint = $req->header("X-Kuiperbelt-Endpoint"); my $session = $req->header("X-Kuiperbelt-Session"); my $msg = $req->content; my $engineio_packet_type = substr($msg, 0, 1); if ($engineio_packet_type eq ENGINEIO_PACKET_TYPE->{ping}) { send_message($endpoint, ENGINEIO_PACKET_TYPE->{pong}, $session); } elsif ($engineio_packet_type eq ENGINEIO_PACKET_TYPE->{message}) { my $socketio_packet_type = substr($msg, 1, 1); if ($socketio_packet_type eq SOCKETIO_PACKET_TYPE->{event}) { my $message = substr($msg, 2); my $event = decode_json($message); if (ref $event eq "ARRAY" && scalar(@$event) == 2 && $event->[0] eq "message") { my $escaped_message = escape_html($event->[1]); my $engineio_message = ENGINEIO_PACKET_TYPE->{message} . SOCKETIO_PACKET_TYPE->{event} . encode_json(["message", $escaped_message]); broadcast_message($engineio_message); $redis->rpush("messages", $escaped_message); } else { warn sprintf("unexpected event message structure: %s", $msg); } } elsif ($socketio_packet_type eq SOCKETIO_PACKET_TYPE->{connect}) { # nop } else { warn sprintf("not support Socket.IO message type: %s", $msg); } } else { warn sprintf("not support Engine.IO message type: %s", $msg); }
ハマった点
Engine.IOのopen
を投げた後にSocket.IOのconnect
を投げないといけない
Engine.IOの接続開始時のopen
メッセージは以下のように組み立てています。
my $resp = ENGINEIO_PACKET_TYPE->{open} . encode_json({ sid => $session, upgrades => [], pingInterval => PING_INTERVAL, pingTimeout => PING_TIMEOUT, });
内容から分かる通り、open
パケットを表す識別子を付けた後にJSONでPING間隔やタイムアウトを返しています。
これで接続確立するだろうと思ったらうまくいきませんでした。Engine.IOのPING/PONGはやりとりしているものの、クライアント側のon("connect")
だったり、on("message")
が発火できませんし、emit
も効きません。どうやらSocket.IOの部分では接続がまだできていない状態のようです。
クライアント側の書き方が悪いのかとデバッグし続けたのですが、なかなか原因がわかりません。そこで何か見落としはないかと本物のSocket.IO同士のやり取りを観察していると、Engine.IOのopen
を投げた後に、さらにサーバからクライアントへSocket.IO
のconnect
が飛んでいることが分かりました。そこで初めてきたPINGの応答を返す前にconnect
を返すと接続確立ができその後チャットもできるようになりました。
しかしまたここで問題です。kuiperbeltのconnect callbackのレスポンスはWebSocket通信確立後に初めてサーバからクライアントへ送るメッセージとなっていて、このSocket.IOのアプリではEngine.IOのopen
を送っています。つまりこの前にkuiperbeltの/send
APIを使ってメッセージを送ろうとしても到達しません。connect callbackのレスポンスを返した後にSocket.IOのconnect
を送らないといけません。
Goであればgoroutineを起動させてちょっとsleepさせたあとに投げれば良いのですが、PerlのPreforkアーキテクチャであれば外部に何らかの遅延ジョブキューなどを置いて対処するという方法があります。実際に東京プリズンではSQSを用いて同様のことを行っています。ですが、サンプルアプリにジョブキューを入れるのは少し大袈裟すぎます。「初めてPINGが飛んできたときにconnect
を送る」というのも試してみてうまく動きましたが、チャットが機能するようになるまで時間がかかりました。
というわけで、SIGALRMを使ったシグナル割り込みを使ってお茶を濁しました。
$redis->rpush("kuiperbelt_socketiostarting", $session); $SIG{ALRM} = sub { while (my $session = $redis->lpop("kuiperbelt_socketiostarting")) { send_message($endpoint, ENGINEIO_PACKET_TYPE->{message} . SOCKETIO_PACKET_TYPE->{connect}, $session); $redis->set("kuiperbelt_socketiostarting:$session", 1); } }; alarm 2;
2秒という数字にあまり根拠はないです。2秒の間に複数のリクエストが来たときも想定して、Redis Listに突っ込んでますが、複数処理したときにどうなるかまではちょっと検討してません。他に何かいい方法があれば教えてください。
io(uri)
するときにパスが付いているとそれがネームスペースになる
kuiperbeltはもともとhttps://kuiperbe.example.com/connect
のようなURLがエンドポイントで、/connect
は固定でした。今回いじってるチャットのサンプルアプリはまずエンドポイントAPIなるもを叩いて、そこからWebSocketのつなぎ先を教えてもらう方式だったのですが、/connect
と入っているURLをio
関数に渡すと/connect
というネームスペースに接続するという動作になるようです。それを無視してネームスペース指定なしで送ると何も起こらない状態になったため、接続先URLから/connect
を取り除く対処が必要でした。
接続先URLは末尾に必ず/
が付いていないといけない
上記に関連するのですが、kuiperbeltはもともと/connect
でWebSocketのリクエストを待ち受けているわけですが、Socket.IOのデフォルトは/socket.io/
です。ただSocket.IO側は変えることが出来ます。
const socket = io(uri, { path: "/connect" });
これでイケる!と思いましたがなかなかうまく行きません。サーバ側で404を返しているログを見ると/connect/?...
となっています。パスとquery stringの間にスラッシュが付いているのでこれ外せないかとSocket.IOのドキュメントやコードを読んだのでしたが、絶対にスラッシュつけるマンという様子でした。
そこで、kuiperbelt側が折れることになります。冒頭の新機能紹介にあったパス変更機能に至るわけです。やはり実際に使ってみないと必要な機能というのは分からないものですね。
この後の展望
前回長田さんが私の記事を予告するときに用いた文言を見ますと「kuiperbeltの新機能 gRPC Adapterを使ったSocket.IO対応」となっています。つまり今回はPerlのWebアプリケーションが直接Socket.IOをしゃべるという内容だったのですが、本来、そして今どきのアプリケーション構成として正しいのは
- kuiperbeltがHTTPのコールバック以外にgRPCをサポートする
- 同居するコンテナアプリケーション(sidecar)として、Socket.IOをしゃべるデーモンを作る
- 2のデーモンがルーム管理を行ったり、APIで操作できる
というものです。来年は未来を目指していきます。
明日は遠藤さんで、「AI Chat Botの話」です。お楽しみに!