豪鬼メモ

一瞬千撃

非同期APIでデータベースを操作する

データベースライブラリの既存の同期的なAPIをラップして、非同期のAPIを作ったという話。C++のstd::futureクラスと独自のスレッドプールを使っている。std::futureのおかげで、スレッドプログラミングの知識が全く無いプログラマでも簡単に非同期処理機能を利用できる。スレッドプールは隠蔽されているが、おかげで低いオーバーヘッドで非同期処理が実現できる。この機能はTkrzw-0.9.43から利用できる。
f:id:fridaynight:20210715221112p:plain


まずは典型的なユースケースを紹介したい。データベースを最適化するRebuildというメソッドがあるが、これはデータベース全体の再構築を行うので、それなりに時間がかかる。それを同期的に呼び出すと、スレッドがブロックすることになる。それが嫌な場合、再構築処理を非同期APIで呼び出して、その完了を待っている間に別の処理を行うとよい。ここでは、「処理中です」というメッセージを1秒に1回の頻度で端末に表示することにしよう。

// 巨大なデータベースがあったとして、それを書き込みモードで開く
PolyDBM dbm;
dbm.Open("casket.tkh", true);

// 10個のワーカースレッドを使うスレッドプールを持った非同期APIを準備
AsyncDBM async(&dbm, 10);

// 再構築処理をバックグラウンドで動かして、futureを受け取る
std::future<Status> result = async.Rebuild();

// 再構築処理が終わるまで待つが、1秒ずつタイムアウトしてメッセージを出す
while (result.wait_for(std::chrono::seconds(1) != future_status::ready)) {
  std::cout << "処理中です" << std::endl;
}

// 結果の成否を表示する
const Status status = result.get();
if (status.IsOK()) {
  std::cout << "成功" << std::endl;
} else {
  std::cout << "失敗: " << status << astd::endl;
}

"1"から"100"まで増加する数値のキーを持つレコードの値を並列に取得するなら、以下のようなコードになる。

// データベースを読み込みモードで開く
PolyDBM dbm;
dbm.Open("casket.tkh", false);

// 10個のワーカースレッドを使うスレッドプールを持った非同期APIを準備
AsyncDBM async(&dbm, 10);

// Getの結果はStatusと文字列のペアなので、それをラップしたfutureを要素に持つ配列を準備
std::vector<std::future<std::pair<Status, std::string>>> results;

// 100個のレコードの検索を開始して、その結果を受け取るfutureを格納
for (int32_t i = 1; i <= 100; i++) {
  const std::string key = ToString(i);
  results.emplace_back(async.Get(key));
}

// 結果を取得して表示する
for (auto& result : results) {
  std::cout << result.get().second << std::endl;
}

コードを一瞥するだけで使い方がだいたい分かる簡潔さだ。AsyncDBMというクラスに既存のDBMのオブジェクトを渡すと、非同期APIのハンドルが作られる。その中ではスレッドプールを内蔵したタスクキューが管理されるのだが、呼び出し側ではそんなことは気にしなくていい。ワーカースレッドの数は、起動したいバックグラウンド処理の並列数に合わせるべきだが、多くても別に困らないので、とりあえず10個でOK。

通常の同期APIとほぼ同じ使い方で非同期APIも使えるのだが、戻り値がstd::futureでラップされているところが違う。std::futureのgetメソッドを呼ぶと、該当の処理の終了を待った上で、戻り値が取得できる。戻り値を取得せずに終了を待ちたい場合は、waitメソッドを呼ぶ。タイムアウト付きで待ちたい場合にはwat_forメソッドを呼ぶ。

操作の終了を待ちたくないし処理結果を知る必要もないという場合には、単に戻り値のfutureを捨ててしまっていい。無視されたfutureがあっても、AsyncDBMオブジェクトが破棄される際には全てのバックグラウンドタスクの待ち合わせが行われてリソースの解放が行われる。したがって、AsyncDBMオブジェクトの生存期間は、生成時に渡したDBMオブジェクトより短い必要がある。

同期APIの基本的なメソッドは、相当するものが非同期APIでもサポートされる。具体的な仕様はAPI文書をご覧いただきたい。多数のレコードを一気に取得するのであればGetMultiメソッドが便利だし、多数のレコードを一気に書き込むのであればSetMultiメソッドが便利だ。

なお、イテレータは非同期APIではサポートしない。なぜなら、Nextを繰り返し呼ぶのがイテレータの本分であるから、それを非同期的にやっていては遅くてしょうがない。しかし、ツリー系の順序ありデータベースで範囲検索をしたい場合には、イテレータが使えないと困る。そういう場合は同期APIイテレータを使えばいいのだが、範囲検索も非同期的にやりたい場合もあるだろう。そんな時は、SearchModalメソッドを使うとよい。範囲検索を含む様々なモードの検索を行って、該当するキーの一覧を返してくれる。例えば "upper" モードは、指定したキーより上(つまり後ろ)の位置にあるキーの一覧を、指定した数だけ返してくれる。データベースの全てのレコードをキーの昇順で取り出す処理を非同期で行うには、以下のようにすればよい。

// ツリーのデータベースを開いて、非同期APIを準備
tkrzw::PolyDBM dbm;
dbm.Open("casket.tkt", false);
tkrzw::AsyncDBM async(&dbm, 4);

// 空文字列を起点にする
std::string last_key = "";
while (true) {
  // 非同期に検索を開始する。結果は最大1000個受け取る
  auto search_future = async.SearchModal("upper", last_key, 1000);
  // フォアグラウンドでは、処理中である旨をお知らせする
  while (search_future.wait_for(std::chrono::seconds(1) != future_status::ready)) {
    std::cout << "キーの検索中です" << std::endl;
  }
  // 結果を取り出す
  auto [status, keys] = search_future.get();
  // 結果が失敗であったり空だったりすれば、ループから抜ける
  if (status != tkrzw::Status::SUCCESS || keys.empty()) {
    break;
  }
  // 取り出したキーを使って、非同期に値を取り出す処理を開始する
  const auto get_future = async.GetMulti(keys).get();
  // フォアグラウンドでは、処理中である旨をお知らせする
  while (get_future.wait_for(std::chrono::seconds(1) != future_status::ready)) {
    std::cout << "値の取得中です" << std::endl;
  }
  // 結果を取り出して表示する
  const auto& records = get_future.get();
  for (const auto& record : records) {
    std::cout << "key=" << key << "  value=" << value << std::endl;
  }
}

コードは幾分長くなったが、非同期処理の割にはかなり簡潔に書けているだろう。なんだかんだ言って、C++ってば結構いけてる言語だ。


前回議論した通り、スレッドプールを使った実装はstd::asyncで毎回スレッド生成を行う実装よりも効率的だ。スレッドプール内蔵タスクキューがTaskQueueというクラスとして提供されるので、それを使えば任意の処理を非同期で行うことができる。だから、わざわざデータベースのAPIと統合しなくても、非同期処理は実装できる。

にもかかわらず、なぜデータベースの操作に特化した非同期APIを提供したのか。それは、タスクキューという概念をプログラマに意識させてしまうと、プログラミングが面倒になるからだ。冒頭に挙げた図に示した概念を理解した上で、バグを混入させないように注意深く実装作業を進めなければならない。例えば、Setメソッドを非同期に実行するには、Setメソッドを呼ぶための関数と、その関数の中で使われるキーと値の文字列のデータをパッケージしたオブジェクトを、タスクキューの中に追加する必要がある。ラムダ式を使えば関数オブジェクトの生成は簡単にできるが、その中で使うキーや値の文字列オブジェクトの寿命をどうやって管理するかを考えなければならない。経験の浅いプログラマは、ラムダ式がキャプチャしたオブジェウトをそのまま使おうとしてしまうかもしれない。そうすると、単にキャプチャされたオブジェクトの生存期間は伸ばされないので、死んだオブジェクトを参照してバグるだろう。よって、std::bindでパラメータとして束縛するなどの工夫が必要になるのだが、元々の同期APIに比べてどうしても読みにくいコードになってしまう。

非同期処理の方法論の中でも、futureを使って短期的に待ち合わせをする方法は、かなり同期処理寄りの書き方になる。並列化している区間がfutureオブジェクトのスコープに限定される傾向があるからだ(futureを破棄したりコンテナに入れて持ち回すことも可能だが)。よって、非同期処理の中ではfutureの方法論は難易度が低い。futureと好対照の方法論は、終了時のコールバックを呼ぶものだ。JavaScriptだとコールバックがよく使われて、それはそれで格好いいのだが、イベント駆動ではないプログラムでコールバックを多用すると無駄に複雑になってしまう。並列化する区間を限定するべく待ち合わせの処理を書く羽目になるくらいなら、待ち合わせを前提にしているfutureを使い倒した方が楽だ。


まとめ。データベースライブラリに非同期処理の機能を組み込んだことにより、メインスレッドをブロックさせないプログラミングスタイルが簡単に実現できるようになった。非同期APIは、インタラクティブなアプリケーションやレイテンシの要求がシビアなサーバサイドの処理に使うのに向いている。