豪鬼メモ

一瞬千撃

マルチシャードのトランザクション

TkrzwにはShardDBMというクラスがあり、シャーディングで分割した複数のデータベースファイルをあたかも単一のデータベースであるかのように透過的に扱うことができる。複数マシンに分散しないローカル環境でも、シャーディングを施すと、並列処理性能や再構築処理等の効率の点で有利な点が多々ある。レイテンシが重要課題となるオンライン系での運用では特にシャーディングは有用だ。シャーディングによって複数のデータベースファイルが関わるとなると、トランザクションの独立性(Isolation)をどうやって実現するかが課題となる。今回は、トランザクションをコールバック関数の連続呼び出しとしてモデル化したProcessMultiメソッドと、任意の外部処理とCompare-and-Swapによって実現するためのCompareExchangeMultiメソッドの実装について説明する。


Tkrzwの各データベースクラスでは、複数のレコードに対する参照操作と更新操作をアトミックに行うProcessMultiメソッドを提供している。キーとコールバック案数のペアのリストを与えると、リスト内の全てのキーのレコードをロックした状態で、各々のコールバック関数を順番に呼び、各々の戻り値でレコードの値を更新してくれる。

ShardDBMが内部に持つ複数のデータベースオブジェクトをシャードと呼ぼう。各シャードは、ACID特性を満たしたトランザクションを実現できるProcessMultiメソッドを実装している。しかし、各々が内部に持っているロック機構は外部に公開されていない。つまり、複数のシャードに渡るロックを同時に行うことはできない。ロックを外部に公開することは技術的には可能だが、それをやってはカプセル化のポリシーが崩壊するので、許容できない。シャード間でロックを共有することができないというのは前提条件だ。その制約の中で、どうやって排他制御を行うか考えねばならない。言い換えれば、外部ロックに頼らずに、どうやってトランザクションの独立性を実現するかを考えなければならない。

現状では、全てのスレッドが、一貫性の必要なトランザクションをProcessMultiを使って行う限りにおいては、独立性を保証できるようになっている。つまり、複数のProcessMultiが同じレコードに同時に実行されないように最小限のロックを掛けるようになっているが、それ以外のGetやSetやRemoveはその制約に律されずに実行される。

f:id:fridaynight:20210615214409p:plain

ProcessMulti同士の排他制御をどうやって実現しているか説明する。各シャードは内部にロック機構を備えていて、キーに対応するロックで保護されたクリティカルセクションの中でコールバック関数を呼ぶことができる。キーに対応するということは、必ずしもそのデータベース内に存在するレコードのキーを使うことを意味しない。存在しないレコードのキーもロックできるということだ。これを利用して、常に0番のシャードで該当の全てのキーをロックした上で、各々のコールバックを呼ぶ。コールバックの中では、0番のシャードに該当する操作であればその場で処理し、そうでなければ他のシャードのProcessメソッドを呼んで処理を移譲する。

ロック待ちでスレッドがブロックする確率はシャード数で割られて低減するのが理想だが、この実装だと常に0番をロックするので、シャーディングによって並列性が上がることはない。トランザクションの一貫性が必要ない操作はProcessMultiを呼ばずに実行でき、その場合には並列性は上がるが、どうにも中途半端な仕様になっていることは否めない。とはいえ、シャードを外部からロックせずに任意のコールバックを呼ぶにはこの方法しかない。


マルチレコードCASを実現するCompareExchangeMultiメソッドは、DBMクラスにおけるMix-in実装としてProcessMultiを使って実装されている。よって、ShardDBMクラスでオーバーライドしなければ、ShardDBMのProcessMultiの特性を引き継いだマルチレコードCASが実現する。すなわち、ProcessMultiやCompareExchangeMultiを使った他のスレッドのトランザクションに対しては独立だが、それ以外の操作からの独立性は保証されないという実装になる。それでも実用にはなるだろうが、マルチレコードCASは並列計算の要なので、より制約の少ない汎用的な実装を提供したいところだ。

「レコードAの値がOで、かつレコードBの値がPである時に、レコードCの値をQにし、かつレコードDの値をRにする」といった、複数レコードにまたがった複数の参照操作と更新操作をアトミックに行うのがマルチレコードCASである。そのためには、このトランザクションに関係するレコードA、B、C、Dをロックした上で、アトミックに全ての操作を実行せねばならない。ProcessMultiを使えばこれを簡単に実現できるが、ProcessMultiの実装に制約があって、それを引き継ぎたくない場合には、ShardDBM独自の実装でオーバーライドする必要がある。

シャーディング環境下では、任意のコールバックをアトミックに実行するProcessMultiを並列性と独立性を低下させずに実装するのは不可能だ。上述したように、0番シャードの空間でロックを行ってProcessMulti間の独立性のみを担保することは可能だ。あるいは全シャードの全体をロックして並列性を犠牲にする実装も容易に思いつくが、並列性を犠牲にしたらシャーディングした意味がない。少なくともシャーディングしていない状態よりは並列性が高くないと存在価値が疑われる。

全てのメソッドに対して独立で、かつロックの範囲を最小化したマルチレコードトランザクションの実装はできないものか。またも世田谷公園を走りながら検討したところ、用途をマルチレコードCASに限れば実装できることが分かった。おさらいになるが、マルチレコードCASは以下の操作からなる。

  • トランザクションに関わるキーを全てロックし、トランザクションの最中の更新と参照を防ぐ
  • CASの事前条件となるレコードの値を調べ、条件に合わないレコードが一つでもあれば失敗する
  • CASの事後条件となる更新操作を順次適用する
  • ロックを解除する

シャーディングされた個々のデータベースは、ProcessMultiメソッドにより、自分が持っているレコードをロックしてコールバックを呼べるが、その他のデータベースのロックは直接操作できない。しかし、コールバックの中で他のデータベースのProcessMultiメソッドを呼ぶことはできる。ということは、コールバックをシャード毎にまとめてから、連鎖式にProcessMultiを呼ぶことで、複数シャードにまたがる全てのレコードに対するアトミックな処理を実装することができそうだ。

f:id:fridaynight:20210615214944p:plain

実装の詳細を説明する。まず、CASの事前条件および事後条件をシャード単位でまとめる。各シャードのインデックスをキー、そのシャードの事前条件のリストと事後条件のリストのペアを値とする連想配列を作る。連想配列はstd::mapで表現するので、関連するシャードのインデックスは昇順でアクセスできる。デッドロックを防ぐには、常に一定の順序でロックを行うことが肝要だ。

std::mapの各要素は、事前条件か事後条件のどちらかまたは両方を1つ以上持っているはずだ。それらを、そのシャードにおけるCASを実装したコールバック関数に変換していく。その際、各シャードにおいて、事前条件の最後のコールバックの最後か、事後条件の最初のコールバックの最初に、次のシャードのCASを呼ぶトリガーをしかける。結果として、全てのキーの事前条件を確認する処理が連鎖しながら最後のシャードまで到達して、全ての条件が合致した場合、最後のシャードで事後条件が設定されてCASが成立する。そうすると処理は呼び出し側に順に戻りながら、各シャードで事後条件の設定が順に行われる。最初のシャードの事後条件の設定が終わったならば、全体のCASが完了したことになる。

自分でも説明を書いていて混乱するほどに、わかりにくい。JavaScriptのイベント処理に例えるとマシになるかもしれない。イベントキャプチャに相当するのが事前条件の検査で、イベントバブルに相当するのが事後条件の適用だ。イベントキャプチャフェーズで不整合が検出されたら、残りの処理は省略される。ターゲットフェーズまで到達して不整合がなければ、その場でCASが成立し、同様にイベントバブルフェーズの全てでCASが成立する運びになる。

CASがこのように実装できるのは、事前条件をどの順序で調べても同じ結果になり、また事後条件をどの順序で設定しても同じ結果になることが分かっているからである。シャード毎に実行順序を組織化することで、排他制御を単純な入れ子構造に変換できる。ここで重要なのは、ProcessMultiと違って、常に0番シャードがロックされるわけではないということだ。トランザクションに関わるキーが属するシャードの、実際に属するキーだけがロックされる。この方法が、マルチシャードのトランザクションの並列性を最大化する理想形であろう。

CASは並列計算の基本である。CASが実現できれば、どんなトランザクションも並列に実行できる。よって、ShardDBMは、ProcessMultiの実用性はイマイチでも、トランザクショナルなデータベースとして利用できる。CAS以外の部分は完全に並列処理できるはずなので、CASの並列性が全体の並列性を律する。CASだけでも効率的な実装ができたのは素晴らしい。図ではたくさんのレコードをロックしているが、実際のユースケースでは2個か3個しかロックしないので、スレッドがブロックする確率はかなり低いはずだ。


ShardDBMはC++からだけでなくJava/Python/Rubyからも利用できる。諸事情でコールバックが必要な機能は実装していないので、ProcessやProcessMultiは実装していないが、CASを司るCompareExchangeやCompareExchangeMultiは利用できる。例えばPythonだとこんな風に使う。

import tkrzw

# DBMを作って、5シャードに分割したデータベースファイルを開く
dbm = tkrzw.DBM()
dbm.Open("casket.tkh", True, num_shards=5, num_buckets=10000)

# 日本と中国の首都を登録
dbm.Set("japan", "tokyo")
dbm.Set("china", "beijing")

# 首都の位置をアトミックに入れ替えちゃう
expected = [("japan", "tokyo"), ("china", "beijing")]
desired = [("japan", "beijing"), ("china", "tokyo")]
print(dbm.CompareExchangeMulti(expected, desired))

# 結果の表示
print(dbm.GetStr("japan"))
print(dbm.GetStr("china"))

# DBMを閉じる
dbm.Close()

Javaはネイティブスレッドを使うので、スレッドをC++と同様の並列性で実行できるわけで、ぜひともスレッドプログラミングをして並列性を追求してほしい。PythonRubyスクリプト自体のスレッド並列性が低いが、Tkrzwの機能はグローバルインタープリタロックの外側で動くように実装されているので、マルチスレッド化する恩恵がそれなりにはある。マルチスレッドを使うなら排他制御は最重要課題になるので、それが気軽にできるCompareExchangeMultiはめちゃめちゃ便利だ。



まとめ。シャーディングを施したデータベースを透過的に扱えるShardDBMクラスでも、マルチレコードのコールバックによるトランザクションを司るProcessMultiメソッドと、マルチレコードのCASを司るOompareExchangeMultiメソッドを実装した。num_shardsってパラメータを1個つけるだけでシャーディングできるし、シャーディング環境下でもマルチレコードトランザクションが発行できる。並列処理とかスループットとかを考えない人には何のことやらって感じだろうけども、考える人にとっては、「これが欲しかったんだ」と感じてもらえるのではないか。