豪鬼メモ

一瞬千撃

DBMでメッセージキューイング その2 CompareExchangeとモニタリング

以前の記事で述べたように、Tkrzw-RPCのサービスはメッセージキューとしても用いることができる。それをさらに進展させて、任意のレコードの状態変化をリアルタイムに監視して処理できる仕組みを、CompareExchangeメソッドを使って実装した。


データベースをメッセージキューとして用いる場合、一つのデータベースが一つのキューとして扱われる。よって、複数のキューを扱いたい場合には、複数のデータベースを運用する必要がある。Tkrzw-RPCのサーバは単一プロセスで複数のデータベースを同時に運用できるので、キューの数が100個くらいまでなら、データベースを100個作ることで対応できる。多分1000個でも動くけど、メモリ使用量との相談になる。

ところで、メッセージキューを介して分散処理を行う場合、その結果を処理するにあたって、以下の3つのパターンが考えられる。

  • タスクを実行するワーカ側(コンシューマ)がその結果について責任を持つ。
    • テータベースに書き込んだり、ファイルを生成したり、外部のユーザに通知したりする
  • タスクの実行結果を別のキューに入れて、それをまた別のワーカが処理する
    • タスクの結果は非同期的に処理されることになる
  • タスクを依頼するドライバ側(プロデューサ)が、タスクの結果を監視して、結果が得られ次第それを処理する
    • タスクの結果を同期的に処理できる

3番目のパターンに着目しよう。プロデューサは、タスクにIDをつけて、それをメッセージに入れる。ワーカはタスクを実行して、その結果をデータベースに格納する。その際に、レコードのキーはタスクIDから生成する。プロデューサは、タスクを投入してから、結果用のデータベース内のタスクIDのキーのレコードを監視し、そこのレコードが書き込まれた瞬間に、それを検出して後処理を行う。この仕組みがあれば、メッセージキューを使ってタスクの分散実行をしつつも、同期的に結果を処理することができる。

下記のイラストを参照されたい。タスクキューの実現にはPushLastおよびPopFirstというメソッドを使っていて、そいつらは入力順とデータベース内の並びが一致するキーを自動生成することで、FIFOを実現している。一方で、結果データベースは、個々のタスクIDから決定されるキーを用いるので、キューではない。個々のドライバは自らが投入したタスクの結果だけをキーを使って監視できるので、同期的なドライバとしての主導権を離さずに、後処理を続けることができる。
f:id:fridaynight:20211106113002p:plain

上記を実現するには、データベースの特定のレコードの変化を監視する仕組みが必要だ。さらに分割して考えると、レコードを更新するメソッドに通知機能をつけ、またレコードの内容を取得する機能に更新を待機する機能をつける必要がある。しかし、Tkrzwのデータベースは、SetとGetとRemoveという単純な処理だけを提供するわけではない。数値の参照と更新を同時に行うIncrementメソッドや、任意のコールバックを読んで任意の参照と更新の処理を行うProcessメソッドをサポートしていて、そこでは単純な更新と参照という区別ができない。

そこで、論理的には全てのレコード演算を実現できるCompareExchangeメソッドに着目する。インターフェイスは多少複雑にはなるが、CompareExchangeは、GetもSetもRemoveもIncrementも、その他の全てのレコード演算も実現できるのだ。CompareExchangeにだけ通知機能と待機機能を持たせれば、リアルタイム処理の込み入った要求の全てに答えることができる。

CompareExchnageは、指定したキーの値が指定したデータである場合に、その値を別のデータに置き換える機能だ。例えば、CompareExhcnage("apple", "15", "16") は、キーが "apple" のレコードの値が "15" であるならば、それを "16" に置き換える。CompareExchangeで10進数のインクリメントを実装したいなら、まずGetで値を得て、それを10進数としてインクリメントしたデータを生成して、更新前の値を更新後の値をパラメータに指定してCompareExchangeを呼ぶ。マルチスレッドで同時に同じキーのレコードをCompareExchangeしようとした場合には、先に実行した方だけが成功し、それ以外は失敗する。失敗した場合には再試行するもよし、エラーを通知するも良しだ。更新前の値にnull値を設定すれば、「レコードがない場合」という条件にできるし、更新後の値にnull値を設定すれば、「レコードを消す」という操作を実行できる。これによって、全ての更新操作はComparExchangeで実現できる。

CompareExchangeには、呼び出し時の既存のレコードの値を調べる機能もある。CompareExchange("apple", "15", "16", &value, &found) などとすると、valueには該当レコードの既存の値が格納され、foundには該当レコードの有無が格納される。更新前の値に特異値ANY_DATAを指定すると、「レコードが存在すれば値は問わない」という条件が指定できるし、更新後の値に特異値ANY_DATAを指定すると、「更新を行わない」という指定ができる。これを使うと、Getの代用もできるようになる。

以上の仕様により、CompareExchangeは全てを兼ね備えた究極生命体カーズ様のような存在になる。CompareExchangeさえあれば、機能的には、他のメソッドは不要なのだ。実際には個々のメソッドで性能の最適化を施す余地があるのでCompareExchangeだけにする予定はないが、とにかくCompareExchangeは最強なのだ。それに待機機能と通知機能をつければ、リアルタイム処理の複雑な要求の大半に応えられることになる。

リアルタイムに何かを監視をするには、条件変数(condition variable)の待機と通知の機能を利用するのが定石だ。しかし、不特定多数のレコードを監視するとして、その全てに対応する不特定多数の条件変数を予め用意することは現実的ではない。よって、一つの条件変数が複数のレコードを監視する必要がある。実際には、条件変数を一つだけにすると並列処理性能が出ないので、複数かつ一定数の条件変数を使って、不特定対数の状態を管理することが必要になる。それを実現するのが、SlottedKeySignalBrokerというクラスである。以下のように使う。

// std::stringクラスをキーにしたシグナルブローカ
// 内部的に16個の条件変数を使って並列化する
SlottedKeySignalBroker<std::string> broker(16);

// 通知側のスレッド
void Notify(const std::string& key) {
  broker.Send(key);
}

// 待機側のスレッド
bool Wait(const std::string& key, double timeout) {
  while (true) {
    // 通知をブロックするためにWaiterオブジェクトを作る
    SlottedKeySignalBroker<std::string>::Waiter waiter;
    // 通知が来ない状態で、条件を調べ、条件が満たされれば変える
    if (CheckSomething(...)) {
      return true;
    }
    // 通知のブロックを解除した状態で、タイムアウトまで通知を待機
    if (!waiter.Wait(timeout)) {
      break;
    }
  }
  return false;
}

キーで名前空間を分ける以外、この構造は前回述べたSignalBrokerクラスの使い方と同じだ。状態を調べてから通知の待機を開始するまでの間に通知が来ると見逃すことになるので、その間の通知をブロックするのが要点だ。

もちろん、サーバ内での涙ぐましい努力はAPIの内部に隠蔽されるので、アプリケーションプログラマは通知機構の詳細について知る必要はない。Tkrzw-RPCにおいては、CompareExchangeとSlottedKeySignalBrokerの機能は統合されて、以下のようなインターフェイスで提供される。

Status CompareExchange(
  std::string_view key,      // 操作対象のレコードのキー
  std::string_view expected, // 更新の事前条件。dataがnullptrなら非存在。ANY_DATAなら任意存在
  std::string_view desired,  // 更新の事後条件。dataがnullptrなら削除。ANY_DATAなら更新なし
  std::string* actual,       // 更新前のレコードの値を格納する文字列オブジェクト
  bool* found,               // 更新前のレコードの有無を格納するboolオブジェクト
  double retry_wait,         // 事前条件が合わない場合に、その秒数だけ待機しつつ再試行
  bool notify);              // 事前条件に合致した場合に通知して待機スレッドを起こす

究極のメソッドとは・・・あらゆるメソッドの・・・全ての能力を身に付け全てのメソッドを兼ねる。そしてあの!美しい!なんという輝き!今まで見た何よりもすばらしい・・・・。参照と更新を兼ねてアトミックに行えるだけでなく、待機と通知をも一度に行えるのだ。

進化したCompareExchangeを使えば、タスクの処理結果が格納されるべきレコードを監視して、格納されたら即座に読み出して処理することが可能になる。読み出しと同時に削除も行える。指定した時間内に結果が格納されなかった場合には、タスクを再投入するもよし、エラーを報告するもよしだ。なお、std::string_viewのデフォルトコンストラクタはnullptrを参照する文字列表現を作るので、非存在や削除を指定するにはそれを使うことになる。

std::string result;
Status status = CompareExchange(
  task_id, ANY_DATA, std::string(), &result, nullptr, 60, false);
if (status.IsOK()) {
  std::cout << "RESULT:" << result << std::endl;
} else {
 std::cout << "Error:" << status << std::endl;
}

タスクを実行して処理結果を格納する側では、以下のようなコードを書くだろう。

std::string result = "...";
Status status = CompareExchange(
  task_id, std::string(), result, nullptr, nullptr, 0, true);
if (!status.IsOK()) {
 std::cout << "Error:" << status << std::endl;
}

ちゃんとこのメソッドが動くのか、コマンドラインで確かめてみよう。まずはTkrzw-RPCのサーバを立てる。デバッグログも表示するようにする。

$ tkrzw_server --log_level debug --async

別の端末にてクライアントコマンドを実行し、"abc" というレコードの値があればそれを取り出そうとする。--compexオプションで監視するキーを指定する。--retryオプションの値は、レコードがない場合に最大何秒間待機するかを指定する。

$ tkrzw_dbm_remote_util queue --compex abc --retry 60

この状態では何もabcのレコードがないので、上記のコマンドはブロックするはずだ。その間に別の端末を開いて、abcのレコードを追加して、さらに通知する。

$ tkrzw_dbm_remote_util queue --compex abc --notify hello

投入側のコマンドを実行したその瞬間に、消費側のブロックが解けて、"hello" って出力されるはずだ。これがリアルタイム性というやつだ。サーバの端末に表示されているログを見ると、プロトコル上、どのようなパラメータが渡されているかを確認することができる。

投入側のコマンドでは、暗黙的に事前条件にnullが指定されている。つまり、abcのレコードがすでに存在している場合には、処理に失敗する。そこにさらに--retryオプションで再試行をさせると、既存の値が取り出されて削除され次第、次の値を投入するという手順が実現できる。キーの名前空間で擬似的にキューを実現しているようなものだ。キーabcのレコードに値hop、step、jumpを順に放り込むには、以下のようにする。

$ tkrzw_dbm_remote_util queue --compex abc --notify --retry 60 hop step jump

投入側でも待機が発生することになったので、消費側のコマンドでも通知を行う。

$ tkrzw_dbm_remote_util queue --compex abc --notify --retry 60
hop
$ tkrzw_dbm_remote_util queue --compex abc --notify --retry 60
step
$ tkrzw_dbm_remote_util queue --compex abc --notify --retry 60
jump

進化したCompareExchangeは、C++以外のクライアントライブラリでも対応している。プロトコルは一緒だが、結果のステータスと値のデータを多値返却する都合上、CompareExchangeAdvancedというメソッドを加えることにした。例えばGoの場合はこのようなAPIになる。

func CompareExchangeAdvanced(
  key []byte,
  expected []byte,
  desired []byte,
  retry_wait double,
  notify bool) ([]byte, Status) { ... }

GetやSetやRemoveなどの全てのメソッドに待機と通知の機能をつけて回るのはダルいし複雑になるのを避けたかったので、CompareExchangeだけが待機と通知の機能を備える。しかし、これさえあれば、大抵のデータベース操作と連携してリアルタイム処理を行うことができる。

前回書いた素因数分解のタスクをこのモデルで書き直してみる。Pythonで書こう。まず、1から1000までの整数をタスクとしてキューに入れるとともに、タスクの処理結果を書き出すドライバプログラムdriver.pyは以下のようになる。

import tkrzw_rpc

# データベースに接続
dbm = tkrzw_rpc.RemoteDBM()
dbm.Connect("localhost:1978").OrDie()

# 1から1000までのループ
for num in range(0, 1001):
  # 操作対象を0番データベースに設定
  dbm.SetDBMIndex(0)
  # タスクをメッセージとして投入する
  status = dbm.PushLast(
    num,   # メッセージの内容。strが呼ばれて文字列になる
    None,  # デフォルトのタイムスタンプを使う
    True)  # 通知を送る
  status.OrDie()
  # 操作対象を1番データベースに設定
  dbm.SetDBMIndex(1)
  # タスクが終了したら書き込まれるレコードを取り出す
  status, value = dbm.CompareExchangeAdvanced(
    num,       # レコードのキー
    dbm.ANY_DATA,  # レコードが存在すれば値は問わない
    None,      # 取り出したレコードは削除する
    60,        # 最大60秒待機する
    False)     # 通知は発送しない
  status.OrDie()
  # 結果を表示する
  print("{} = {}".format(num, value.decode("utf-8")))

# 終了のメッセージを送信する
dbm.SetDBMIndex(0)
dbm.PushLast(-1, None, True)
# 接続を切る
dbm.Disconnect().OrDie()

タスクの整数を取り出して素因数分解の結果をデータベースに書き込むワーカブログラムworker.pyは以下のようになる。

import tkrzw_rpc

# 素因数分解の関数
def Factorize(num):
  factors = []
  for div in range(2, num + 1):
    while (num % div) == 0:
      factors.append(div)
      num //= div
  return factors

# データベースに接続
dbm = tkrzw_rpc.RemoteDBM()
dbm.Connect("localhost:1978").OrDie()

# 無限ループ
while True:
  # 操作対象を0番データベースに設定
  dbm.SetDBMIndex(0)
  # 5秒でタイムアウトさせつつメッセージを得る
  record = dbm.PopFirstStr(5)
  if not record: continue
  num = int(record[1])
  # 負数が来たら抜ける
  if num < 0: break
  # 素因数分解を行う
  print("Processing: " + str(num))
  factors = Factorize(num)
  # 操作対象を1番データベースに設定
  dbm.SetDBMIndex(1)
  # 結果をメッセージとして書き込む
  expr = str(factors)
  status, value = dbm.CompareExchangeAdvanced(
    num,       # レコードのキー
    None,      # そのレコードは存在しないことを期待
    expr,      # 計算結果を値として書き込む
    None,      # 待機と再試行はしない
    True)      # 通知を発送する
  status.OrDie()

# 接続を切る
dbm.Disconnect().OrDie()

サーバは、以下のコマンドで立てる。メッセージキューtasks.tktは順序付きであるB+木データベースにして、結果を格納するデータベースresult.tkhはハッシュ表データベースにしてある。

$ tkrzw_server --log_level debug --async tasks.tkt results.tkh

ドライバとワーカをそれぞれ別の端末で立ち上げる。

$ python3 driver.py
$ python3 worker.py

そうすると、ドライバを実行した端末に以下のような素因数分解の結果が印字されているはずだ。

0 = []
1 = []
2 = [2]
3 = [3]
4 = [2, 2]
5 = [5]
6 = [2, 3]
..
995 = [5, 199]
996 = [2, 2, 3, 83]
997 = [997]
998 = [2, 499]
999 = [3, 3, 3, 37]
1000 = [2, 2, 2, 5, 5, 5]


まとめ。CompareExchangeに待機と通知の機能を加えることで、ほぼ全てのレコード操作に対して監視とリアルタイム処理が行えるようになった。裏ではSlottedKeySignalBrokerという謎の仕組みが動いていて、そのおかげて並列性を損なわずに無限の名前空間を監視することができている。

TkrzwのSQLiteやRedisに対する優位性というか独自性を敢えて主張するなら、その並列処理性能にある。Tkrzw-RPCにリアルタイム処理を支援する機能が加わったことで、よりその強みが発揮しやすいようになったのではないか。まあ、使ってくれる人がいればの話だけども。