私が設計したDBMに特徴的な機能である、アトミックなレコード処理について詳しく紹介したい。以前誰かがデータベースのクンフーであると言ってくれた面白い機能だ。
DBMはkey-valueのストレージであり、典型的な操作はGetとSetとRemoveだ。Getは現在の値を取得し、Setはレコードを作るか新たな値を設定し、Removeはレコードを削除する操作だ。シングルスレッド環境では、それらを組み合わせればいかなる操作も実現できる。しかし、マルチスレッド環境ではそうもいかない。例えばATMで銀行口座からお金を引き落とすことを考えよう。夫はコンビニAで、100円入っている口座から、80円を引き落とす操作を行う。同時に、妻はコンビニBで、100円入っている口座から60円引き落とす操作を行う。
- 今の値は100である。
- スレッド1がGetで値100を読み込み、80を引いて、20という値を生成する。
- スレッド2がGetで値100を読み込む、60を引いて、40という値を生成する。
- スレッド1がSetで値20を書き込む。
- スレッド2がSetで値40を書き込む。
結果として、夫は80円を引き出して、妻は60円を引き出したのに、口座に40円も残っている。錬金術だ。これは典型的なレースコンディションのシナリオで、読み込みと書き込みがアトミックに行えていないことに起因する。ならばmutexを使った排他制御をすればいいじゃないかという話になるのだが、誰がそのmutexを管理するのか。アプリケーションに排他制御の責任を持たせるのはバグの元だし、仮に実装できたとしても、そんなのをメンテしたい奴は居ない。そもそもDBMで口座管理を実装する奴は居ないだろうが。ともかく、データベース層でレコード単位のアトミックな演算ができると便利なことが多い。そうすれば、アプリケーションの実装がとても単純になるのだ。我らがDBMはその機能を提供する。上記の引き落としの処理を実装するには、このようなコードを書けば良い。
// 口座の金額を変える。account_idは口座番号、deltaは増減の金額。 Status ProcessAccount(DBM* dbm, std::string_view account_id, int64_t delta) { // 口座プロセッサのステータス Status proc_status(Status::SUCCESS); // 口座の金額を変えるプロセッサ class AccountProcessor : public DBM::RecordProcessor { public: // コンストラクタ。ステータスオブジェクトのポインタと、増減させる金額を受け取る。 AccountProcessor(Status* status, int64_t amount) : status_(status), delta_(delta) {} // 該当の口座がある場合に呼ばれる。keyとvalueは既存のキーと値の文字列。新しい値を文字列として返す。 std::string_view ProcessFull(std::string_view key, std::string_value value) { int64_t num_value = StrToInt(value) + delta_; if (num_value < 0) { status_->Set(Status::INFEASIBLE_ERROR, "金が足りねーよ!"); return NOOP; // 値を変更しない特別値。 } new_value_ = ToString(num_value); // 返す文字列はProcessが終わるまで生存させるべし。 return new_value_; } // 該当の口座がない場合に呼ばれる。 std::string_view ProcessEmpty(std::string_view key) { status_->Set(Status::NOT_FOUND_ERROR, "そんな口座ねーだろ!"); return NOOP; // 値を変更しない特別値。 } private: Status* status_; int64_t delta_; std::string new_value_; } proc(&proc_status, delta); // 口座プロセッサを該当アカウントに適用する。システムエラーの場合に失敗する。 const Status dbm_status = dbm->Process(account_id, &proc, true); if (dbm_status != Status::SUCCESS) { return dbm_status; } // 口座プロセッサの成否を報告する。 return proc_status; }
DBM::RecordProcessorというインターフェイスを継承してProcessFullとProcessEmptyというメソッドを実装するクラスを書く。そのクラスをDBMのProcessというメソッドに渡すと、該当のレコードがある場合にはProcessFullが、ない場合にはProcessEmptyが呼ばれる。双方とも文字列を返すことができ、その文字列が新しい値となる。この処理を行なっている間はその該当のレコードはロックされ、その変更はアトミックに反映される。この時点でなんか複雑に見えるかもしれないが、自分でロックの管理をするよりは遥かにマシだ。DB全体にロックをかければスレッドを使っている意味がなくなってしまうし、レコード毎にmutexを作るのは現実的でないので、自分でスロットロックを書かなければいけない。それは非生産的だ。アプリケーションを書くときはアプリケーションのロジックに集中できるようにすべきで、並列性のことなんて気にしないでいたいものだ。
レコードをロックしたまま行って良いのは、一瞬で終わる処理だけだ。レコードをロックしたままファイルを読んだりユーザの入力を受け付けたり他のデータベースにアクセスしたりしてはいけない。そんなことをしたら全体のスループットがガタ落ちになってしまう。読み出しから結果の反映まで時間のかかる処理を行いたいが、トランザクションのアトミック性は確保したい。そんなときは、Compare-and-Swapという技法を使う。同じくRecordProcessorを使ってCompareExchangeというメソッドを実装し、それをDBMクラスのユーティリティとしてビルトインしておく。こんな実装になる。
class RecordProcessorCompareExchange final : public DBM::RecordProcessor { public: RecordProcessorCompareExchange(Status* status, std::string_view expected, std::string_view desired, std::string* actual) : status_(status), expected_(expected), desired_(desired), actual_(actual) {} std::string_view ProcessFull(std::string_view key, std::string_view value) override { if (actual_ != nullptr) { *actual_ = value; } if (expected_.data() != nullptr && expected_ == value) { return desired_.data() == nullptr ? REMOVE : desired_; } status_->Set(Status::DUPLICATION_ERROR); return NOOP; } std::string_view ProcessEmpty(std::string_view key) override { status_->Set(Status::NOT_FOUND_ERROR); return NOOP; } private: Status* status_; std::string_view expected_; std::string_view desired_; std::string* actual_; }; Status DBM::CompareExchange( std::string_view key, std::string_view expected, std::string_view desired, std::string* actual) { Status impl_status(Status::SUCCESS); RecordProcessorCompareExchange proc(&impl_status, expected, desired, actual); const Status status = Process(key, &proc, true); if (status != Status::SUCCESS) { return status; } return impl_status; }
Compare-and-SwapとかCompare-and-Exhangeとか言われる技法は、既存の値が期待した値だった時にのみ、新しい値に置き換えるという操作を、アトミックに行うことである。これを応用すると、いかなるロングトランザクションも実装できる。上で挙げた口座の金額変更をCompareExchangeを使って実装してみる。
Status ProcessAccount(DBM* dbm, std::string_view account_id, int64_t delta) { // 成功するか、エラーになるまで、繰り返す。 while (true) { // トランザクション開始前の値を取得 const std::string old_value = dbm->GetSimple(account_id); // 新しい値を生成 const int64_t num_new_value = StrToInt(old_value) + delta; if (num_new_value < 0) { return Status(Status::INFEASIBLE_ERROR, "金が足りねーよ!); } const std::string new_value = ToString(num_new_value); // トランザクションの反映を試みる const Status status = dbm->CompareExchange(account_id, old_value, new_value); // 比較交換成功ということは、トランザクション中にレコードの値が変わっていないということなので、 // 一貫性が確保できているとみなせる。 if (status == Status::SUCCESS) { break; } // DUPLICATION_ERRORは値が変わっていることを意味する。一貫性が取れているか怪しいので // トランザクションをやり直す。それ以外の場合、システムエラーなので、それを返す。 if (status != Status::DUPLICATION_ERROR) { return status; } } return Status(Status::SUCCESS); }
この方法だと、ロックはCompareExchangeの中だけで完結しているので、アプリケーション側のコードでいかに遅いことをやろうとも問題ない。Webアプリケーションでページ遷移しながら入力を受け付けて最後にそれを反映させるような場合にはこの方法が向いている。上の例では該当アカウントで他の変更があったことが検出された場合には暗黙的に再試行しているが、その場合にはエラーを返すという挙動にしてもいいだろう。
アプリケーションロジックを含んだRecordProcessorを実装してProcessに渡すというやり方だと、データベースサーバのプロセスにアプリケーションロジックを組み込むことになるので、メンテしづらくなる。ロジックを変える度にデータベースサーバごと再起動させるのは現実的でない。一方で、CompareExchangeを使う方法であれば、DBMを単なるKVSのサービスとして運用して、クライアント側でアプリケーションロジックが完結させられる。なので、アクセスカウンタ程度のものすごい単純な処理は独自プロセッサを書いてProcessを直接呼ぶようにしても良いが、それなりに複雑なものはCompareExchangeを使うべきだと思う。ところで、複数レコードに跨った一貫性が必要な場合はどうするか。それはもうDBMを使っていてはいけない。