C++11標準以降のasyncとfutureによる非同期処理は非常に使いやすく、とても簡単に非同期処理を実装することができる。それによって、マルチスレッドでは複数の処理を並列実行できた場合には、スループットが向上させられる。一方で、単一の処理をシングルスレッドで処理する場合には、スレッドを作ったりスレッド間のコンテキストスイッチをしたりするのにかかるオーバーヘッドで、スループットは低下する。それについて性能評価をした。スレッドプールを実装して、それとの比較も行った。
変数を100万回インクリメントするという単純な処理を考える。無理やりラムダ式を使ってそれを同期呼び出しするように実装するなら、以下のようになる。
TEST(ThreadUtilTest, TaskSimple) { constexpr int32_t num_tasks = 1000000; std::atomic_int32_t count = 0; for (int32_t i = 0; i < num_tasks; i++) { auto task = [&]() { count.fetch_add(1); }; task(); } EXPECT_EQ(num_tasks, count.load()); }
次に、std::asyncのstd::launch::deferredモードを使った実装を考える。std::asyncは任意のラムダ式の遅延評価を行う仕組みで、std::futureオブジェクトを返してくれる。std::launch::deferredモードを指定した場合、std::futureのwait/getメソッドを呼んだ時に評価される。つまり、遅延評価時に同期的にラムダ式が実行されるので、実際には非同期処理ではない。しかし、後述の非同期処理と全く同じシンタックスで記述できるのが利点だ。
TEST(ThreadUtilTest, TaskDeferred) { constexpr int32_t num_tasks = 1000000; std::atomic_int32_t count = 0; for (int32_t i = 0; i < num_tasks; i++) { auto task = [&]() { count.fetch_add(1); }; std::future<void> f = std::async(std::launch::deferred, task); f.wait(); } EXPECT_EQ(num_tasks, count.load()); }
上述のstd::launch::deferredをstd::launch::asyncに変えてやると、真の非同期処理が行われる。これは裏でスレッドが作られて、そのスレッドがラムダ式を処理するので、std::futureのwait/getを呼び出す前に別の処理を入れてやれば、並列処理が行われることになる。
TEST(ThreadUtilTest, TaskAsync) { constexpr int32_t num_tasks = 1000000; std::atomic_int32_t count = 0; for (int32_t i = 0; i < num_tasks; i++) { auto task = [&]() { count.fetch_add(1); }; std::future<void> f = std::async(std::launch::async, task); f.wait(); } EXPECT_EQ(num_tasks, count.load()); }
std::asyncはお手軽で良いのだが、毎回スレッドを作るので、実行時のオーバーヘッドが大きすぎる嫌いがある。そこで、スレッドプールを使って、予め決まった数で生成しておいたスレッドを使い回すのが常套手段だ。スレッドプールはまだ標準化されていないので、簡単なクラスを実装しておいた。コードはこちらである。それを使ったテストケースは以下のようになる。今回は1スレッドのみを使う。
TEST(ThreadUtilTest, TaskQueue) { constexpr int32_t num_tasks = 1000000; std::atomic_int32_t count = 0; tkrzw::TaskQueue queue; queue.Start(1); for (int32_t i = 0; i < num_tasks; i++) { auto task = [&]() { count.fetch_add(1); }; queue.Add(task); } queue.Stop(60); EXPECT_EQ(0, queue.GetSize()); EXPECT_EQ(num_tasks, count.load()); }
上述のスレッドプールの例だと、ループ内でタスクの結果を受け取っていないので、セマンティクスが異なることになる。それを合わせるべく、std::promiseとstd::futureで同期を取るようにしたのが以下の例である。
TEST(ThreadUtilTest, TaskQueueFuture) { constexpr int32_t num_tasks = 1000000; std::atomic_int32_t count = 0; tkrzw::TaskQueue queue; queue.Start(1); for (int32_t i = 0; i < num_tasks; i++) { std::promise<void> p; std::future<void> f = p.get_future(); auto task = [&]() { count.fetch_add(1); p.set_value(); }; queue.Add(task); f.wait(); } queue.Stop(0.05); EXPECT_EQ(0, queue.GetSize()); EXPECT_EQ(num_tasks, count.load()); }
なお、std::futureを使わなくてもstd::condition_variableで同等の処理が実装できる。ただし、やっていることはstd::futureとほぼ同じなので、効率は変わらない。std::futureを使った方が読みやすいので、直接std::condition_variableを使う理由はない。
TEST(ThreadUtilTest, TaskQueueCond) { constexpr int32_t num_tasks = 1000000; std::atomic_int32_t count = 0; tkrzw::TaskQueue queue; queue.Start(4); for (int32_t i = 0; i < num_tasks; i++) { std::mutex mutex; bool done = false; std::condition_variable cond; auto task = [&]() { count.fetch_add(1); { std::lock_guard<std::mutex> lock(mutex); done = true; } cond.notify_one(); }; queue.Add(task); std::unique_lock<std::mutex> lock(mutex); cond.wait(lock, [&]() { return done; }); } queue.Stop(0.05); EXPECT_EQ(0, queue.GetSize()); EXPECT_EQ(num_tasks, count.load()); }
結果発表。100万をかかった時間で割ってQPS(秒間スループット)を算出した。実行環境はCore i7 8550U(8コア、1.8GHz)上のUbuntu 20.10である。スレッドプールはスレッドの数によってオーバーヘッドが変わるので、1、2、4、8、16で別途測定した。
同期処理 | 1,428,571,428 QPS |
std::launch::deferred | 2,232,142 QPS |
std::launch::async | 59,988 QPS |
スレッドプール1 | 2,415,458 QPS |
スレッドプール2 | 1,371,742 QPS |
スレッドプール4 | 375,093 QPS |
スレッドプール8 | 434,782 QPS |
スレッドプール16 | 457,875 QPS |
スレッドプール1+future | 168,350 QPS |
スレッドプール2+future | 150,285 QPS |
スレッドプール4+future | 145,857 QPS |
スレッドプール8+future | 145,011 QPS |
スレッドプール16+future | 143,245 QPS |
まず、同期処理の結果は、足し算をしているだけになるので、14億QPSと異常に速い。それに比べると、std::launch::deferredの遅延評価をするだけでスループットは223万QPSにまで落ちてしまう。そして、std::launch::asyncのスレッド呼び出しをすると、スループットは6万QPSまで落ちてしまう。とはいえ、毎回スレッドを生成してこのスループットが出るのは、かなり早いとも言える。それでもスレッドプールを使った方が遙かに効率が良く、1スレッドなら241万QPSのスループットを出せるようになる。しかし、スレッドプールを使ってもループ内で同期するとスループットはガクッと落ちて、16万QPSになる。
スレッドプールのオーバーヘッドはスレッド数によって奇妙な変化をする。1スレッドで241万QPSなのが、2スレッドだと137万QPSに悪化する。それ以降は、4で37万、8で43万、16で45万と、スレッド数との相関がほとんどなくなる。タスクキューの追加を通知する際の条件変数の挙動が律速しているような気はしているが、確証はない。一方、futureで同期する場合、スレッド数に関わらず15万QPSくらいのスループットになる。
async+futureのセマンティクスはスレッドープール+futureが完全にカバーするので、可能であればスレッドプールを使うべきだ。元々の処理をバックグラウンドで実行すると考えると、バックグラウンド処理とフォアグラウンド処理の双方が6万QPSよりも遅いならasync+futureを使っても良い。それよりも高速な処理でも、16万QPSよりも遅いなら、スレッドプール+futureを使って良い。短期的な待ち合わせが必要なく、241万QPSよりも遅い処理をバックグラウンドで動かしたいなら、スレッドプールにそのようなタスクを入れてもよい。そうでなければ、直列にやる方がよい。ファイルI/OやネットワークI/Oが絡む場合は並列化すべき条件のどれかに当てはまることが多いので、やはり並列化した書き方はスキルセットとして持っておくべきだろう。
まとめ。asyncとfutureを使ったスレッド生成ベースの非同期処理では、6万QPSくらい出せる。効率が最適とはいえないが、カジュアルなユースケースはこれで十分だろう。とはいえ、スレッドプールを使えば16万QPSもしくは241万QPSに改善するので、可能であればスレッドプールを活用した方が良い。
Tkrzwの非同期APIを実装しようと思ってスレッドプールの実装をしたのだが、C++ではラムダ式で簡単に非同期処理が書けるので、わざわざデータベース側で非同期化の面倒を見る必要はない。よって、スレッドプールそのものをライブラリのユーティリティに含めて、必要であれば使ってねという感じにして済ませようと思う。ただ、PythonやRubyのAPIを考えると、各処理系のスレッド機構やコルーチンのオーバーヘッドを回避しつつ、ネイティブスレッドでの並列処理ができるので、やはり非同期APIを実装してもいいかなという気にもなる。悩ましいところだ。