前回の記事にて、バッチ化とストリーム化によってgRPCの性能が向上することを示した。残る高速化の手段は、非同期APIを利用することである。結論としては、サーバ側では非同期APIを利用すると性能がかなり向上する。これを実装したTkrzw-RPC 0.7.1をリリースしたので、お試しいただきたい。
この記事では、同期APIと非同期APIの性能測定をして、非同期APIの優位性を示す。また、C++の非同期APIの解説がネット上にあまりに少なかったので、実装の進め方を詳細にメモしておく。
gRPCのC/C++ネイティブAPIには同期APIと非同期APIがある。また、サーバ側とクライアント側でそれぞれ同期APIか非同期APIを選択でき、プロトコル上はそのいかなる組み合わせも許容される。つまり、サーバ側が非同期APIを使ってクライアント側が同期APIを使ったり、その逆の構成であったりしても、問題なく通信できるようになっている。
同期APIを使ったアプリケーションは、構造が単純なので実装が楽だが、スループットは低くなる傾向にある。非同期APIのアプリケーションはその逆で、実装はやたら複雑になるが、うまいこと実装すれば高いスループットが得られる。普通、クライアント側は分散しているので個々のクライアントのスループットが問題になることは少ない。よって、クライアント側では同期APIを選択して、メンテナンス性を重視するというのは合理的な選択だ。一方で、サーバ側はスループットが最重要課題になるので、競争力のある製品を作りたければ、非同期API一択と言って良い。
なぜ同期APIが遅いのかを簡単に説明する。既にRPCのコネクションが張ってあって、同期APIにてリクエストを送信し、レスポンスを受信するとしよう。その場合、非常に端的には、以下の事象が順に起こる。
- クライアントは、sendシステムコールにて、リクエストのメッセージを送る。
- メッセージの全体を送るまで、そのスレッドはブロッキングしながらsendを続ける。
- クライアントは、recvシステムコールにて、即座にレスポンスを読もうとするが、まだレスポンスは送られていないので、ブロックする。
- サーバは、epollシステムコールにて、とあるソケットがパケットを受信したことを検知する。
- サーバは、recvシステムコールにてリクエストのメッセージを受信する。
- メッセージの全体を受け取るまで、そのスレッドはブロッキングしながらrecvを続ける。
- サーバは、リクエストの内容に基づいて処理を行い、レスポンスを生成する。
- サーバは、sendシステムコールにて、レスポンスのメッセージを送る。
- メッセージの全体を送るまで、そのスレッドはブロッキングしながらsendを続ける。
- クライアントは、recvのブロックキングが解けて、レスポンスを受け取る。
- メッセージの全体を受け取るまで、そのスレッドはブロッキングしながらrecvを続ける。
同期APIが遅い最大の原因は、個々のリクエストとレスポンスの組毎に、sendとrecvが繰り返されることである。クライアント側では、1回のsend呼び出しで送れるのは最大1回のリクエストに過ぎず、1回のrecv呼び出しで受け取れるのは1回のレスポンスに過ぎない。サーバ側では、1回のrecvで受け取れるのは最大1回のリクエストに過ぎず、1回のsend呼び出しで送れるのは1回のレスポンスに過ぎない。つまり、システムコールの呼び出し回数が多くなるのと、個々のシステムコールの呼び出しでブロッキングによる遅延が発生するのが、スループットを低下させる。
一方で、非同期APIでは、クライアント側でもサーバ側でも、データを送る際には、sendする代わりに送信用のキューにデータを入れるだけである。それはメモリ上だけで行われるノンブロッキングの処理である。データを受け取る際には、キューに入れられているデータを受け取るだけである。これもノンブロッキングの処理である。送信キューに入れられた複数のメッセージは、適当な長さにまとめられた上でsendシステムコールで一気に相手に送られる。ソケットがデータを受信した際には、それを一気に読み込んで個々のメッセージに分割されて受信キューに入れられれる。送信も受信もブロッキングされる可能性はあるが、ブロッキングしない方の処理が優先して実行される。よって、システムコールの呼び出し回数も少ないし、それがブロッキングされる確率も低くなるので、スループットが向上する。
性能測定をしよう。サーバは、同期APIモードでは、最大ワーカスレッド数4で運用する。非同期APIモードでは、スレッド数は1、2、4とする。クライアントとサーバは同じマシンで動かして、IPv4で接続する。
$ tkrzw_server --address "127.0.0.1:1978" --threads 4 $ tkrzw_server --address "127.0.0.1:1978" --async --threads 1 $ tkrzw_server --address "127.0.0.1:1978" --async --threads 2 $ tkrzw_server --address "127.0.0.1:1978" --async --threads 4
クライアント側では、以下の処理を行うEchoはデータベースの処理を全く行わないので、単にRPCのラウンドトリップの時間が図れる。それ以外はデータベースの操作を伴うので、実際の運用時の性能を占うのに良い。スレッド数は1、2、4で測定する。
- Echo : リクエストで送ったメッセージをそのまま返す
- Set : 普通のユナリーAPIによるSet操作
- Set-Stream : ストリームAPIによるSet操作
- Set-B-10 : 10リクエストをバッチ化したSet操作
- Set-B-100 : 100リクエストをバッチ化したSet操作
結果は以下のようになる。単位はQPSだ。
Sy 1 | Sy 2 | Sy 4 | |
Echo | 16,830 | 25,585 | 27,414 |
Set | 16,557 | 23,211 | 27,271 |
Set-Stream | 22,259 | 27,760 | 26,642 |
Set-B-10 | 133,372 | 203,831 | 237,732 |
Set-B-100 | 393,403 | 806,322 | 1,258,652 |
As1 1 | As1 2 | As1 4 | As2 1 | As2 2 | As2 4 | As4 1 | As4 2 | As4 4 | |
Echo | 20,329 | 28,079 | 39,987 | 19,601 | 27,024 | 34,807 | 20,093 | 27,290 | 30,735 |
Set | 20,145 | 27,329 | 35,609 | 19,562 | 24,945 | 30,538 | 19,560 | 25,370 | 30,291 |
Set-Stream | 29,586 | 35,667 | 34,192 | 28,721 | 35,782 | 31,471 | 29,451 | 35,799 | 30,941 |
Set-B-10 | 147,396 | 232,637 | 304,020 | 148,475 | 222,078 | 298,877 | 153,150 | 223,508 | 294,932 |
Set-B-100 | 431,015 | 945,345 | 1,040,406 | 540,021 | 903,255 | 1,461,627 | 504,011 | 962,187 | 1,504,228 |
比較しやすいように、同期APIの1スレッドを基準とした比率に直すとこうなる。
Sy 1 | Sy 2 | Sy 4 | As1 1 | As1 2 | As1 4 | As2 1 | As2 2 | As2 4 | As4 1 | As4 2 | As4 4 | |
Echo | 100% | 152% | 163% | 121% | 167% | 238% | 116% | 161% | 207% | 119% | 162% | 183% |
Set | 100% | 140% | 165% | 122% | 165% | 215% | 118% | 151% | 184% | 118% | 153% | 183% |
Set-Stream | 100% | 125% | 120% | 133% | 160% | 154% | 129% | 161% | 141% | 132% | 161% | 139% |
Set-B-10 | 100% | 153% | 178% | 111% | 174% | 228% | 111% | 167% | 224% | 115% | 168% | 221% |
Set-B-100 | 100% | 205% | 320% | 110% | 240% | 264% | 137% | 230% | 372% | 128% | 245% | 382% |
Echoの結果で見ると、同期APIに比べて、非同期APIは、1スレッドで1.2倍のスループットが出る。それだけだと大して嬉しくないのだが、スレッド数が増えた場合に効果が増すところが重要だ。同期APIでは4スレッドにしても163%しかスループットが上がらなかったが、非同期APIだと232%まで上がる。注目すべきは、非同期APIのサーバ側のスレッド数を増すほどに、Echoのスループットは下がるということだ。サーバ側のスレッドを増やす最大の理由はサーバ内部での処理を並列化することだが、Echoではサーバ内部の処理が皆無なので、意味がない。逆に、スレッド数を増やすほど、イベント通知等の負荷が上がるので、遅くなってしまう。
Setの結果を見ていこう。SetもEchoとほどんど結果が同じ傾向である。なぜなら、データベースが高速すぎて、Echoと対して変わらない結果になってしまっている。ある意味チートであるバッチ化の結果を除外して考えると、非同期APIのサーバ側4スレッドでクライアント側2スレッドで最大のスループットが出ている。4コアCPUなので、クライアントも同一マシンで動いてCPUを使っていることを考えると、サーバ側2スレッドの結果とサーバ側4スレッドの結果があまり変わらないのも仕方ない。サーバ側のスレッド数をCPUのコア数と同じにするというのがgRPC本家の推奨なのだが、クライアントがリモートマシンで動く場合にはそれが当てはまるように思える。バッチ化した場合にはRPC層のオーバーヘッドの比率が下がるので、サーバ側のスレッドがより有効的に使われるので、非同期の4スレッドが最速になる。
理論上、非同期APIかつストリームAPIを使うのが最速なのだが、この実験結果はだいたいそれに合致する結果にはなっている。とはいえ、非同期APIで、サーバ側とクライアント側のスレッド数を増やした場合には、非同期APIのユナリーAPIと非同期APIのストリームAPIの差は小さい。非同期APIは全ての評価項目で同期APIに優るので、同期APIを使う理由はもはやない。非同期APIのおいて、ユナリーAPIとストリームAPIのどちらを使うかだが、どちらでもよい。理論上はストリームAPIの方が効率がよいはずなので、とりあえずストリームAPIを使っておいて損はないだろう。
ともかく、クライアントとサーバが同一マシンで動いている状態でも、バッチ化せずとも35000 QPSまでは出ることがわかった。ちなみに、ストリームAPIの更新系の操作では、ignore_resultオプションでレスポンスを省略することで、スループットをさらに上げられる。同じ実験環境で、同期APIなら10万QPS、非同期APIなら13万QPSほど出る。実運用のように、クライアントを別マシンで分散させた状態であれば、もっと出せるはずだ。
サーバ側の実装を解説しよう。既に同期APIによる実装があって、そのテストも書いているので、それをできるだけ再利用して、非同期APIに対応する。最終的には、同期APIと非同期APIの二つのモードを起動時に切り替えて動作するようにしたい。実運用上は非同期APIモードしか使わないだろうが、動作検証や性能評価ができるという意味では同期APIモードが動作し続けるというのも価値があろう。基本的な実装はtkrzw_server_impl.hに書いてある。
まず、同期APIのかつての実装を思い出そう。tkrzw_rpc.protoをコンパイルすると、DBMService::Serviceというクラスが生成されるので、それを継承したクラスDBMServiceImplを定義し、その個々のRPCメソッドの実装をオーバーライドしてやれば良い。Echoの実装は以下のようになる。
class DBMServiceImpl : public DBMService::Service { public: grpc::Status Echo( grpc::ServerContext* context, const EchoRequest* request, EchoResponse* response) { response->set_echo(request->message()); return grpc::Status::OK; } };
このままだと再利用できないので、DBMServiceBaseというクラスを新たに作って、全てのRPCメソッドの実装をそこに移す。分かりやすさのため、名前の末尾にはImplを付ける。
class DBMServiceBase { public: grpc::Status EchoImpl( grpc::ServerContext* context, const EchoRequest* request, EchoResponse* response) { response->set_echo(request->message()); return grpc::Status::OK; } };
DBMServiceImplクラスは、DBMServiceBaseとDBMService::Serviceを多重継承し、個々のRPCメソッドはDBMServiceBaseから継承した実装に処理を移譲する。
class DBMServiceImpl : public DBMServiceBase, public DBMService::Service { public: grpc::Status Echo( grpc::ServerContext* context, const EchoRequest* request, EchoResponse* response) override { return EchoImpl(context, request, response); } };
これで同期APIは準備完了だ。次に、非同期APIの対応に移る。プロトコルバッファをコンパイルした結果、非同期API用にはDBMService::AsyncServiceというクラスが生成されているので、それと上述のDBMServiceBaseを他重継承したクラスDBMAsyncServiceImplを実装する。その中に、タスクキューのタスクを処理する関数と、タスクキューの終了処理を行う関数を書く。
class DBMAsyncServiceImpl : public DBMServiceBase, public DBMService::AsyncService { public: // タスクキューのタスクを逐次処理する void OperateQueue(grpc::ServerCompletionQueue* queue, const bool* is_shutdown); // タスクキューの終了処理を行う void ShutdownQueue(grpc::ServerCompletionQueue* queue); };
タスクキューのタスクを逐次処理するOperateQueueという関数が今回の肝である。これは、いわゆるイベントループを実装している。無限ループしつつ、キューにタスクがあればそれを取り出して処理し、呼び出し側からシャットダウンの指示があればループから抜ける。面白いのが、タスクオブジェクトの寿命管理をタスクオブジェクト自身がしているところだ。
inline void DBMAsyncServiceImpl::OperateQueue( grpc::ServerCompletionQueue* queue, const bool* is_shutdown) { // Echo用のタスクオブジェクトを「産卵」する // 自身の所有権は自身によってタスクキューに渡される new AsyncDBMProcessor<EchoRequest, EchoResponse>( this, queue, &DBMAsyncServiceImpl::RequestEcho, &DBMServiceBase::EchoImpl); // Echo以外メソッドのタスクオブジェクトもここで産卵しておく new AsyncDBMProcessor<InspectRequest, InspectResponse>( this, queue, &DBMAsyncServiceImpl::RequestInspect, &DBMServiceBase::InspectImpl); ... // イベントループ while (true) { // タスクキューから次のイベントを取り出す void* tag = nullptr; bool ok = false; if (!queue->Next(&tag, &ok)) { break; } // 生きているタスクは、AsyncDBMProcessorInterfaceのインタンスのはず auto* proc = static_cast<AsyncDBMProcessorInterface*>(tag); if (ok) { // タスクを処理する proc->Proceed(); } else { // クライアントからセッションが終了されたか、サーバがシャットダウンされた場合には、 // ここに来る。 // タスクをキャンセルする proc->Cancel(); // シャットダウンの場合は、イベントループから抜ける if (*is_shutdown) { break; } } } }
上記では、AsyncDBMProcessroという謎のテンプレートが使われている。これがタスクの処理を抽象化したものだ。リクエストのprotoメッセージをキューから読み込み、レスポンスのprotoメッセージを更新し、それをキューに書き込むという責務を全うする。実際には全ての処理はDBMServiceBaseのメソッドにやらせる。
各種のRPCメソッドを統一して扱うためのインターフェイス class AsyncDBMProcessorInterface { public: virtual ~AsyncDBMProcessorInterface() = default; // タスクの処理を進める virtual void Proceed() = 0; // タスクをキャンセルする virtual void Cancel() = 0; }; // リクエストとレスポンスとその読み書き方法をパラメータ化したテンプレート template<typename REQUEST, typename RESPONSE> class AsyncDBMProcessor : public AsyncDBMProcessorInterface { public: // ステートマシンの現在の状態 enum ProcState {CREATE, PROCESS, FINISH}; // キューからRPCセッションを取り出すためのメソッド型 typedef void (DBMService::AsyncService::*RequestCall)( grpc::ServerContext*, REQUEST*, grpc::ServerAsyncResponseWriter<RESPONSE>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*); // リクエストを処理してレスポンスを生成するためのメソッド型 typedef grpc::Status (DBMServiceBase::*Call)( grpc::ServerContext*, const REQUEST*, RESPONSE*); // CREATE状態のオブジェクトを生成して、処理をひとつ進める AsyncDBMProcessor( DBMAsyncServiceImpl* service, grpc::ServerCompletionQueue* queue, RequestCall request_call, Call call) : service_(service), queue_(queue), request_call_(request_call), call_(call), context_(), responder_(&context_), proc_state_(CREATE), rpc_status_(grpc::Status::OK) { Proceed(); } // ステートマシンの状態に応じて、処理をひとつ進める void Proceed() override { if (proc_state_ == CREATE) { // CREATE状態の場合、キューからRPCセッションを取り出し、PROCES状態に遷移する。 // request_call_は実際にはServiceImpl::RequestEchoなどを呼び出す。 // キューに自身(this)を渡すことで、次のNextでまた自身のProceedが呼ばれる。 proc_state_ = PROCESS; (service_->*request_call_)(&context_, &request_, &responder_, queue_, queue_, this); } else if (proc_state_ == PROCESS) { // キュー経由の最初の呼び出し、既にリクエストが読み出されているので、それを処理する。 // 自分は次の呼び出しで死ぬので、子供を産卵して、後のことを託す。 // 自分の生存期間と子の生存期間が重複することで、ネットワーク処理とRPCの内容の処理 // が多重化される。 new AsyncDBMProcessor<REQUEST, RESPONSE>(service_, queue_, request_call_, call_); // リクエストからレスポンスを生成する処理を移譲する。 rpc_status_ = (service_->*call_)(&context_, &request_, &response_); // 処理の完了を通知するとともに、次のNextでまた自身のProceedを呼ばせる。 proc_state_ = FINISH; responder_.Finish(response_, rpc_status_, this); } else { // 自分で静かに死ぬ。通知はしない。 delete this; } } // キャンセル時にリソースを解放するために呼ばれる void Cancel() override { if (proc_state_ == PROCESS) { // 実行中のタスクは、キューに終了を通知した上で、次のProceedで死んでもらう proc_state_ = FINISH;; responder_.Finish(response_, rpc_status_, this); } else { // キューが既にシャットダウンしている場合にここに来る。 // 自分で静かに死ぬ。通知はしない。 delete this; } } private: DBMAsyncServiceImpl* service_; grpc::ServerCompletionQueue* queue_; RequestCall request_call_; Call call_; grpc::ServerContext context_; REQUEST request_; RESPONSE response_; grpc::ServerAsyncResponseWriter<RESPONSE> responder_; ProcState proc_state_; grpc::Status rpc_status_; };
タスクキューがシャットダウンされると、中のタスクのブロッキングは全て解除されて取り出せるようになるが、実際に取り出す処理は自分で書かねばならない。そのためのメソッドをShutdownQueueを実装する。gRPCのチュートリアルだとこの処理について全く述べられていないので、シグナルで死なない実装に組み込むと、assertで落ちてしまう。
void DBMAsyncServiceImpl::ShutdownQueue(grpc::ServerCompletionQueue* queue) { // タスクキューをシャットダウンする。 queue->Shutdown(); void* tag = nullptr; bool ok = false; // 全てのタスクを取り出す while (queue->Next(&tag, &ok)) { auto* proc = static_cast<AsyncDBMProcessorInterface*>(tag); // 静かに殺す。 delete proc; } }
非同期サーバのライブラリとしての実装は終わったので、あとはそれを駆動する実行可能バイナリのコードtkrzw_server.ccを改変する。非同期APIの部分だけを抜き出すと、以下のようになる。
void Process(...) { ... // 非同期サーバを作って、サーバビルダに登録 grpc::ServerBuilder builder; builder.AddListeningPort(address, grpc::InsecureServerCredentials()); auto service = std::make_unique<DBMAsyncServiceImpl>(dbms, &logger); builder.RegisterService(service.get()); // タスクキューを作る。マルチコア用に、複数作ってもよい。 std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> async_queues; async_queues.resize(num_threads); for (auto& async_queue : async_queues) { async_queue = builder.AddCompletionQueue(); } // サーバを作る std::unique_ptr<grpc::Server> server(builder.BuildAndStart()); assert(server.get() != nullptr); // シグナルハンドラを登録。 // こいつらはg_shutdownをtrueにして、server->Shutdown()を呼ぶ。 static bool g_shutdown = false; std::signal(SIGINT, ShutdownServer); std::signal(SIGTERM, ShutdownServer); std::signal(SIGQUIT, ShutdownServer); // スレッド毎にタスクキューを駆動するラムダ関数 auto task = [&](grpc::ServerCompletionQueue* queue) { async_service->OperateQueue(queue, &g_is_shutdown); }; // 上記をスレッド毎に動かす std::vector<std::thread> threads; for (auto& queue : async_queues) { threads.emplace_back(std::thread(task, queue.get())); } for (auto& thread : threads) { thread.join(); } // 個々のタスクキューをシャットダウンする for (auto& queue : async_queues) { async_service->ShutdownQueue(queue.get()); } ... }
ここまでで、サーバ側の非同期APIの使い方を網羅的に説明出来ている気がする。ただし、非同期のユナリーAPIについては解説したが、非同期のストリームAPIについてはまだ触れていない。これも注意を要するので、ここで紹介しておきたい。ストリームAPIのメソッドを実装するには、既に述べたAsyncDBMProcessorInterfaceを継承したクラスを書けばよい。Tkrzw-RPCでは、レコードを巡回するイテレータの実装にストリームAPIを利用しているが、そこでは1回のRPCリクエストの中で、ReadとWriteを繰り返す処理が行われる。よって、ステートマシンの状態が、CREATE, BEGIN, READ, WRITE, FINISHに細分化される。
class AsyncDBMProcessorIterate : public AsyncDBMProcessorInterface { public: enum ProcState {CREATE, BEGIN, READ, WRITE, FINISH}; // CREATE状態のタスクを生成して、処理を一つ進める。 AsyncDBMProcessorIterate( DBMAsyncServiceImpl* service, grpc::ServerCompletionQueue* queue) : service_(service), queue_(queue), context_(), stream_(&context_), proc_state_(CREATE), iter_(nullptr), dbm_index_(-1), rpc_status_(grpc::Status::OK) { Proceed(); } void Proceed() override { if (proc_state_ == CREATE) { // CREATE状態の場合、セッションを開始して、BEGIN状態に遷移 // タスクキューに自身を通知して、Proceedがまた呼ばれるようにする proc_state_ = BEGIN; service_->RequestIterate(&context_, &stream_, queue_, queue_, this); } else if (proc_state_ == BEGIN || proc_state_ == READ) { // タスクキュー経由の奇数回の呼び出しではここに来る。 if (proc_state_ == BEGIN) { // タスクキュー経由の1回目の呼び出しでは、子供を作って後事を託す。 new AsyncDBMProcessorIterate(service_, queue_); } // ストリーム内リクエストを読み込んで、タスクキューに自身を通知して、 // WRITE状態に遷移する。 proc_state_ = WRITE; request_.Clear(); stream_.Read(&request_, this); } else if (proc_state_ == WRITE) { // タスクキュー経由の偶数回の呼び出しではここに来る。 response_.Clear(); rpc_status_ = service_->IterateProcessOne( &iter_, &dbm_index_, &context_, request_, &response_); if (rpc_status_.ok()) { // ストリーム内レスポンスを書き込んで、タスクキューに自身を通知して、 // READ状態に遷移する。 proc_state_ = READ; stream_.Write(response_, this); } else { // エラー時にはセッション終了して、自身を通知 proc_state_ = FINISH;; stream_.Finish(rpc_status_, this); } } else { // エラー時かセッション終了時かキャンセル時にここに来る。 // 静かに死ぬ。通知はしない。 delete this; } } // キャンセル処理 void Cancel() override { if (proc_state_ == READ || proc_state_ == WRITE) { // 処理中のタスクは終了して、自身を通知 proc_state_ = FINISH;; stream_.Finish(rpc_status_, this); } else { // そうでないなら、静かに死ぬ。 delete this; } } private: DBMAsyncServiceImpl* service_; grpc::ServerCompletionQueue* queue_; grpc::ServerContext context_; grpc::ServerAsyncReaderWriter<IterateResponse, IterateRequest> stream_; ProcState proc_state_; std::unique_ptr<DBM::Iterator> iter_; int32_t dbm_index_; tkrzw::IterateRequest request_; tkrzw::IterateResponse response_; grpc::Status rpc_status_; };
非同期のストリームAPIの解説もマジで少ないので、把握するのにかなり苦労した。本家のチュートリアルでは、非同期APIかつストリームAPIの組み合わせの解説は存在しない。タスクが産卵して世代交代を続けるスタイルで面食らう上に、キューを使ってステートマシンを駆動するのがまた理解しづらい。全体像を把握すると、「なるほど!」と膝を打つのだが、そこまで行かずに挫折する人も多そうだ。
まとめ。gRPCの非同期APIは同期APIに比べて高性能なので、可能であれば使うべきだ。性能を重視するなら、同期APIを使う理由はない。非同期APIの欠点は、実装がやたら複雑になることだが、サーバを実装しようという漢であれば、その困難に立ち向かうべきだ。今回の解説がその一助になれば幸いだ。