豪鬼メモ

一瞬千撃

DBMでメッセージキューイング その0 準備

単純なkey-valueストレージであるDBMを使ってメッセージキューを実現するにはどうすればよいかという思考実験をした上で、実装もしてしまおうという企画。初回は、機能要件と要素技術を検討する。
f:id:fridaynight:20211017031155p:plain


メッセージキューは分散処理のミドルウェアとしてよく使われる。処理に一定の時間がかかるタスクが大量にある場合、そのタスク指示をキューに入れていき、それを順番に取り出して処理する。キューを挟むことで、タスクを生成する側(プロデューサ)も、タスクを処理する側(コンシューマ)も、独立したプロセスやサービスとして実装できるようになる。プロデューサも複数あってよいし、コンシューマも複数あってよい。メッセージキュー自体がサービスになっていれば、全体を疎結合で連携させられる。

メッセージキューを本格的に使いたい場合、専用の製品がプロプライエタリでもフリーソフトウェアでも数多あるので、その中の一つを選ぶのが良いだろう。一方、ちょっとしたタスクを分散して実行したい場合には、DBMのサービスであるTkrzw-RPCでも十分に役割を果たす。この企画では、仕様を絞って、最も単純なメッセージキューイングサービスを実装する。

メッセージキューは、キューなので、いわゆるFIFO(first-in-first-out)構造だ。つまり、先に入れたメッセージが先に取り出される。これを実現するには、順序付きのデータベースに、インクリメンタルなIDをキーにしたレコードを入れれば良い。"00000001", "00000002", "0000003" といったキーでレコードを入れていけば、イテレータはその順番にレコードにアクセスできる。しかし、連番を振るには、今の番号をどこかに記録しておかねばならないが、それはダルい。そのデータが失われると厄介だからだ。一方、メッセージの生成時刻を使う方法も考えられる。DBMのキーは重複できないので、重複する可能性のある時刻情報からキーを生成する場合、重複回避の処理が必要になる。とはいえ、ナノ秒単位で扱えば重複確立は非常に低いので、重複回避もビジーループで適当に実装すれば問題ないだろう。タイムスタンプはビッグエンディアンのバイナリデータとして表現する。そうすると、デフォルトの比較関数であるバイト列の辞書順で評価すれば、FIFOが実現できる。

Status Enqueue(TreeDBM* dbm, std::string_view value) {
  for (uint64_t seq = 0; true; seq++) {
    // 10ナノ秒単位でタイムスタンプを作るが、精度が低い場合にもseqを足すことで
    // 重複確率を下げる
    const uint64_t timestamp = GetWallTime() * 100000000 + seq;
    const Status status = dbm->Set(IntToStrBigEndian(timestamp), value, false);
    if (status != Status::DUPLICATION_ERROR) {
      // 成功するか、重複以外のデータの場合は、処理を終えて状態を返す
      return status;
    }
  }
  return Status(Status::UNKNOWN_ERROR);  // ここには到達しない
}

データベースの中から最初のレコードを取り出せば、最も古く投入されたタスクを取り出せることになる。外部イテレータを使えばこれは実現できる。イテレータを作って(MakeIterator)、最初の位置に飛ばして(First)、現在位置のレコードを取り出して(Get)、現在位置のレコードを消す(Remove)という一連の操作を行えばよい。この考え方で実装すると、以下のようになる。

Status Dequeue(TreeDBM* dbm, std::string* value) {
  auto iter = dbm->MakeIterator();
  iter->First();
  const Status status = iter->Get(nullptr, value);
  if (status == Status::SUCCESS) {
    return status;
  }
  return iter->Remove();
}

しかし、この実装には欠陥がある。並列に呼び出されると、同じレコードを重複して処理してしまうからだ。並列性を考えるなら、そもそも、メソッドが別れているのが面倒くさい。最初のレコードを取り出しつつ消すという、PopFirstというメソッドがあった方が便利だ。よって、DBMインターフェイスにそれを加え、全てのDBM派生クラスで実装しておいた。これを使うと、タスクの取り出しは以下のように単純化できる。排他制御は内部でよしなにやってくれる。

Status Dequeue(TreeDBM* dbm, std::string* value) {
  return dbm->PopFirst(nullptr, value);
}

1つ以上のタスクがデータベース上に投入されていれば、上記のDequeueは成功する。タスクを処理した後にまた次のタスクを取り出すという手順を繰り返すことで、全てのタスクが処理される。タスクが1つもなかった場合は、Dequeueは失敗する。その場合、クライアントは、適当な間隔を置いてから、またDequeueを呼ぶ。いわゆるポーリング方式だ。単純なポーリング方式の場合、クエリが失敗してから次にクエリを呼ぶまでの間隔が、タスクが処理されるまでのレイテンシの期待値に反映される。よって、ポーリング間隔を短くしたいが、そうすると、タスクが存在しない場合にもシステムに高い負荷がかかることになるので、極端に短くすることはできない。

Cometなどに見られる、いわゆるロングポーリングという方式は、単純なポーリング方式の問題点を緩和してくれる。クライアント側でのポーリング待機間隔はゼロにして、サーバ側でキューの状態変化を待つのだ。そのためには、以下のようなDequeueのラッパーを書く。タスクが空だった場合にのみ、通知を待機する。

Status WaitAndDequeue(TreeDBM* dbm, SignalBroker* broker,
                      std::string* value, double timeout) {
  double deadline = GetWallTime() + timeout;
  while (true) {
    status = Dequeue(dbm, value);
    if (status != Status::NOT_FOUND_ERROR) {
      return status;
    }
    // 通知を待機するが、タイムアウトしたら抜ける
    double time_diff = deadline - GetWallTime();
    if (time_diff <= 0 || !broker.Wait(time_diff)) {
      break;
    }
  }
  return Status(Status::INFEASIBLE_ERROR);
}

Enqueueでタスクを加えた後には、スレッド間で共有されたSignalBrokerのSendメソッドを読んで通知を行えばよい。

Status EnqueueAndNotify(TreeDBM* dbm, SignalBroker* broker, std::string_view value) {
  const Status status = Enqueue(dbm, value);
  if (status != Status::SUCCESS) {
    return status;
  }
  // パラメータがtrueの場合、notify_allで、単一スレッドによみ通知を行う
  broker.Notify(true);
  return Status(Status::SUCCESS);
}

タスクの追加と通知はアトミックに行わない。アトミックにやろうとするとmutexでロックされた中でタスクの追加と通知を行うことになるが、それは非効率だ。タスクの追加というI/Oが並列化できないのも困るし、通知された側がすぐにロックを獲得できないので一旦ブロックされるのも良くない。よって、通知はタスクを追加した後に行う。しかし、この方法にも欠陥がある。以下のケースでデッドロックするからだ。

  • スレッドA:Dequeue失敗
  • スレッドB:Enqueue
  • スレッドB:Notify
  • スレッドA:Wait

実際にはタイムアウトがあるからデッドロックはしないのだけど、タイムアウトまでロックするので、やはり許容できない。問題は、スレッドBがNotifyした時には誰も待機していないので誰も通知を受け取らないが、その後にスレッドAが待機状態になることだ。通常の条件変数のイディオムでは、「Enqueue」「DequeueとWait」を同一のmutexで保護するして排他的にすることで、この問題を回避する。しかし、EnqueueやDequeueをクリティカルセクションに入れると、DB操作の並列性がなくなってしまう。

この問題の本質は、コンシューマ側のDequeueとWaitの間に、プロデューサ側のNotifyが割り込み得ることにある。これを回避するには、DequeueとWaitのシーケンスを共有ロックで保護して、Notifyを排他ロックで保護すれば良い。Enqueueはロックの外なので並列化できるし、Dequeueは共有ロックなので並列化できる。Notifyだけが排他ロック内で実行されるが、それは一瞬で終わるので性能には影響しない。この方式を簡単に実装できるように、SignalBrokerというクラスを実装しておいた。これを使うと、WaitAndDequeueの実装は以下のようになる。

Status WaitAndDequeue(TreeDBM* dbm, SignalBroker* broker,
                      std::string* value, double timeout) {
  double deadline = GetWallTime() + timeout;
  while (true) {
    // 共有ロックとその解放を変数スコープに関連付ける
    SignalBroker::Waiter waiter(broker);
    // キューからタスクを取り出し、成功すれば抜ける
    status = Dequeue(dbm, value);
    if (status != Status::NOT_FOUND_ERROR) {
      return status;
    }
    // タスクがない場合、通知を待機するが、タイムアウトしたら抜ける
    double time_diff = deadline - GetWallTime();
    if (time_diff <= 0 || !broker.Wait(time_diff)) {
      break;
    }
  }
  return Status(Status::INFEASIBLE_ERROR);
}

このSignalBrokerの実装にはコツがある。共有ロックされた状態で排他ロック内のNotifyを実行することはできないので、通知を受け取るその瞬間だけは共有ロックを外さねばならない。これは条件変数が通知を待つ際にunique_lockの排他ロックを一時的に解除するのと同じ理屈だ。よって、WaiterオブジェクトのWaitメソッドは以下のような実装になる。

inline bool SignalBroker::Waiter::Wait(double timeout) {
  // 締め切りの時刻を計算する
  const auto deadline = std::chrono::steady_clock::now() +
      std::chrono::microseconds(static_cast<int64_t>(timeout * 1000000));
  // 条件変数のmutexをロックする
  std::unique_lock<std::mutex> lock(broker_->mutex_);
  // 待機スレッドの数を増やす
  broker_->wait_count_++;
  // 待機スレッドの数は通知によってリセットされるので、それを待つ
  while (broker_->wait_count_ != 0) {
    // 共有ロックを一時的に外して、通知を待機する
    broker_->notify_mutex_.unlock_shared();
    auto wait_rv = broker_->cond_.wait_until(lock, deadline);
    broker_->notify_mutex_.lock_shared();
    // タイムアウトした場合、抜ける
    if (wait_rv == std::cv_status::timeout) {
      // タイムアウトと同時に通知が来ていた場合、成功を返す
      if (broker_->wait_count_ == 0) {
        return true;
      }
      // そうでない場合、待機スレッドの数を減らして、失敗を返す
      broker_->wait_count_--;
      return false;
    }
  }
  return true;
}

また、通知はnotify_allで待機スレッドの全てに行う。なぜnotify_oneで単一スレッドを起こすだけでは駄目かというと、以下のようなケースがあるからだ。

  • スレッドA:Wait
  • スレッドB:Wait
  • スレッドC:Enqueue
  • スレッドD:Enqueue
  • スレッドC:Notify
  • スレッドD:Notify
  • スレッドA:Dequeue

ここでは、Enqueueは2回行われるが、Dequeueは1回しか行われない。Waitの中で通知を待機している状態では、共有ロックは外れているので、その間に連続して複数のNotifyが割り込むことは有り得る。連続した複数のNotifyは1回のNotifyと変わらない。よって、毎回のNotifyで全ての待機スレッドを起こさないと、通知を取りこぼす可能性がある。notify_allによって起こされたコンシューマ側のスレッドは、Enqueueされたタスク数を超えた分は無駄にビジーループすることになる。とはいえ、無駄にビジーループしているってことは暇だということなので、実用上の問題はない。


以上の議論により、単純なタスクキューを実現するための要素技術は揃ったと言える。ところで、単純なユースケースではこれで良いが、エラー処理を真面目に考えると、これでは不足する場合もあるだろう。タスクを処理しているコンシューマが、タスクを正常に処理する前に死ぬかもしれない。分散処理に参加する機材が増えると、1台以上が故障する確率は台数に比例して上がってくるので、100台とかになるとエラー処理を真面目に考えねばならない。しかし、GetとRemoveを同時に行うPopFirstだと、取り出したメッセージは消えてしまい、再試行できない。

ならば、メッセージを取り出すと同時に、削除するのではなく、「処理中」を意味する状態に遷移させれば良い。同一データベース内で名前空間を区切った領域にデータを移してもよいし、別のデータベースにデータを移しても良い。そうして待避しておいたデータは、タスクが成功した場合には、コンシューマ側からの要求を受けて、削除する。タスクが失敗したり、コンシューマ側が死んだ場合には、待避データは残り続ける。定期的に待避データをキューに戻せば、再試行が成立する。

さらに言えば、一つのデータベースで複数のキューを扱いたいというユースケースもあるだろう。その場合、名前空間を接頭辞にして、タイムスタンプを接尾辞につけて、名前空間毎にFIFOを実現することになるだろう。その場合にはPopFirstメソッドでは機能が足りないので、専用のメソッドを実装することになるだろう。単一のタスクを複数のコンシューマが処理するユースケースも考えられる。その場合、処理された数を数えるメタデータを別途管理することが必要になるだろう。

現状のTkrzw-RPCの枠組みの中では、再試行や複数キューや処理数のカウントは実装すべきではない。それらを実現するとしたら、専用のサービスを書くべきだ。一方で、PopFirstメソッドとSignalBrokerクラスだけで実装できる範囲ならば、現状のTkrzw-RPCに入れてもよいだろう。おそらく、PushLastとかいうメソッドを追加して対応をとることになるだろう。次回以降にその具体的な設計と実装を進める。