豪鬼メモ

一瞬千撃

マルチレコードトランザクション機能

Tkrzwでマルチレコードトランザクションを実装してみたという話。シングルレコードトランザクション、つまり同一のレコードに複数の操作をアトミックに行う機能は従来から実装されていた。それを複数のレコードでもできるようにしたのがマルチレコードトランザクションだ。前回の記事で載せたプレゼン資料を書いていた途中で、そういやTODOリストに入れていたなと思い出して、実装してみた。

結論としては、複数レコードが関わるトランザクションがかなり簡単に書けるようになった。特にマルチレコードCASが便利なので使ってもらいたい。
f:id:fridaynight:20210531180839p:plain


データベースの文脈では、トランザクションとは、データベースに対する複数の操作をアトミックに実行することである。例えば、銀行口座から1000円を引き出すとしよう。そのためには、該当の口座の現在の預金金額を調べる参照操作と、そこから1000を引いた値を設定するという更新操作がアトミックに行われねばならない。参照操作と更新操作の間に別の更新操作が割り込むことは許されない。

Tkrzwでは単一のレコードに対する複数の操作をアトミックに行う機能を従来から提供している。Processというメソッドだ。それにコールバック関数を渡すと、該当のレコードの現在の値をパラメータとして実行し、戻り値を新しい値として設定してくれる。コールバック関数が実行されている間は該当のレコードはロックされているため、レースコンディションが起きないことは保証されている。

上述の預金引き出し処理を実装してみよう。レコードの値を10進数文字列とみなして、その数値から100を引いた値を設定する。ただし、預金が負数になる操作は許さないとする。

const int64_t delta = -1000;
Status status(Status::SUCCESS);
std::string new_value;
auto transact = [&](std::string_view key, std::string_view value) -> std::string_view {
  // 該当の講座がない場合、valueがNOOPになるので、エラーを返す
  if (value.data() == DBM::RecordProcessor::NOOP.data()) {
    status.Set(Status::NOT_FOUND_ERROR, "no such account");
    return DBM::RecordProcessor::NOOP;
  }
  // 値を10進数とみなして数値に変換し、金額を増減させる
  const int64_t balance = StrToInt(value) + delta;
  // 負数になった場合、エラーを返す
  if (balance < 0) {
    status.Set(Status::INFEASIBLE_ERROR, "insufficient balance");
    return DBM::RecordProcessor::NOOP;
  }
  // 結果を文字列として返す(生存期間を伸ばすため、文字列は外側で定義)
  new_value = ToString(balance);
  return new_value;
}
dbm.Process("foo", transact, true);

これ、めちゃ格好いいと思わないだろうか。ノーコードの流れに完全に逆らうフルコード。SQLのようなドメイン特化言語を使うでもなく、全てC++で書いてのけるという男塾。とはいえ、ラムダ式をレコードに適用するってのは、まるでJavaScriptを書いているかのような手軽さだ。


レコード単体のトランザクションだと、できることが限られる。より踏み込んで、複数のレコードの操作をアトミックにやりたいという時もあるだろう。例えば、口座Aから口座Bへ1000円を振り込むことを考える。これには、口座Aの残額を1000減らす操作と、口座Bの残額を1000増やすという操作を、アトミックに行う必要がある。仮に、口座Aからの出金操作をした直後に口座Bが解約されたとしたら、口座Aへの返金を行うロールバック処理をアプリケーションの責任で行わねばならない。アプリケーション側でトランザクションログを書き出した上で確実にロールバック処理を行うように実装することは可能だが、そもそもロールバック処理をしないで済むようにした方が楽だ。

したがって、口座Aのレコードと口座Bのレコードをロックした上で、双方のレコードの更新処理を行えるようにする。他のスレッドはロックされているレコードを更新できないので、ロールバックが必要な事態は発生しない。仮にロールバック的なことが必要だとしても、トランザクションの内部で元の値を設定しなおせばよい。

ProcessMultiという新しいメソッドが加わったので、それを使った振り込み処理を実装してみよう。口座Bの存在を確認し、口座Aを更新し、口座Bを更新する、という3つの操作をアトミックに行うことになる。

Status Transfer(DBM* dbm, std::string_view src_key, std::string_view dest_key,
                int64_t amount) {
  // トランザクションの状態を報告するためのオブジェクト
  // これがSUCCESSでない場合、残りの処理は飛ばされる
  Status op_status(Status::SUCCESS);

  // 振り込み先のレコードがあるか調べるコールバック
  auto check_dest =
      [&](std::string_view key, std::string_view value) -> std::string_view {
        if (value.data() == DBM::RecordProcessor::NOOP.data()) {
          op_status.Set(Status::NOT_FOUND_ERROR, "no such destination account");
        }
        return DBM::RecordProcessor::NOOP;
      };

  // 振り込み元のレコードを操作するコールバック
  std::string new_src_value;
  auto proc_src =
      [&](std::string_view key, std::string_view value) -> std::string_view {
        if (!op_status.IsOK()) {
          return DBM::RecordProcessor::NOOP;
        }
        if (value.data() == DBM::RecordProcessor::NOOP.data()) {
          op_status.Set(Status::NOT_FOUND_ERROR, "no such source account");
          return DBM::RecordProcessor::NOOP;
        }
        const int64_t current_balance = StrToInt(value);
        if (current_balance < amount) {
          op_status.Set(Status::INFEASIBLE_ERROR, "insufficient balance");
          return DBM::RecordProcessor::NOOP;
        }
        new_src_value = ToString(current_balance - amount);
        return new_src_value;
      };

  // 振り込み先のレコードを操作するコールバック
  std::string new_dest_value;
  auto proc_dest =
      [&](std::string_view key, std::string_view value) -> std::string_view {
        if (!op_status.IsOK()) {
          return DBM::RecordProcessor::NOOP;
        }
        if (value.data() == DBM::RecordProcessor::NOOP.data()) {
          op_status.Set(Status::APPLICATION_ERROR, "inconsistent transaction");
          return DBM::RecordProcessor::NOOP;
        }
        const int64_t current_balance = StrToInt(value);
        new_dest_value = ToString(current_balance + amount);
        return new_dest_value;
      };

  // 3つのコールバックを順番に実行する
  const Status status = dbm->ProcessMulti({
      {dest_key, check_dest},
      {src_key, proc_src},
      {dest_key, proc_dest}}, true);
  if (!status.IsOK()) {
    return status;
  }
  return op_status;
}

多少複雑になったが、アプリケーション側の責任でアトミック性を確保することの困難さを思えば、この程度のコード量で実現できるというのは素晴らしい。ラムダ式が外部変数をキャプチャできるのがとても便利で、あたかも単一の関数の中でブロックを書き分けている程度の気軽さで複数のコールバック関数を実装することができる。

トランザクションの処理は全てがロック内で行われ、他のスレッドは途中経過にアクセスできない。よって、トランザクションのACID特性のうち、AtomicityとConsistencyとIsolationは満たされていると言える。Durabilityに関しては、データベースを追記モードで運用し、またトランザクション後に呼んだSynchronizeの完了をもってトランザクション成功とみなせば、OSやプロセスの突然死に耐えるという程度で満たせる。

ロールバックを直接的に支援する機能はないが、そもそもProcessMultiのコールバックはロック内の局所時間で実行するのが前提なので、ロールバックしなきゃいけない事態は想定しづらい。一貫性が保てるかどうかの事前検査をしてから、実際のレコードの更新を始めればよいのだ。


複数レコードのロックをどうやって実装したがについて解説する。以前にも述べたが、Tkrzwのハッシュデータベース(HashDBM)は、ハッシュロックという特殊な機構で排他制御を行っている。ハッシュ表において各レコードが属するバケットを128個のスロットに分類した上で、各レコードが所属するバケットが所属するスロットのmutexをロックするという機構だ。こうすると、ハッシュ表の連結リストの保護とレコードのデータの保護を同時に行うことができるし、mutexオブジェクトの数を定数にしてメモリを節約できる。さらに、レースコンディションを起こすことなく、リハッシュでハッシュ表のサイズを変えられる。

複数のレコードをロックするということは、内部的には複数のスロットのロックをするということだ。ところで、複数のレコードが単一のスロットに割り当てられている場合、それを二重にロックしようとするとデッドロックになってしまう。また、ロックを確保する際の順番は常に一定でないとデッドロックしてしまう。例えば、Aの次にBをロックしようとするスレッドと、Bの次にAをロックしようとするスレッドが、互いにAとBのロックを確保してしまうと、デッドロックしてしまう。それを回避するには、ロックの対象となるスロットをIDでソートしてから、その昇順にロックし、処理が終わったら逆順にアンロックすればよい。リハッシュを行うスレッドは全てのスロットを昇順にロックしてから処理を行うので、ロック作業を部分的にでも実行しているスレッドがいるうちはリハッシュは行われない。

一方、B+木であるツリーデータベース(TreeDBM)では、排他制御はページ(=ノード)単位で行われる。同じページに属する複数のレコードが同時に更新されることはないようになっている。これを利用して、複数のレコードが所属する複数のページをロックすることにした。ハッシュロックの時と同様に、複数のロックを確保するので、デッドロックを避けるための作法が必要になる。ページIDでソートしてからロックし、逆順でアンロックするというものだ。

ところで、トランザクションのアトミック性のためのロックとB+木のページ管理のためのロックを同じにする必要はない。レコード単位のスロット化されたロックを用意するか、ページオブジェクトの上にレコードの数だけmutexを用意するという案もあった。しかし、そうすると、ページロックとレコードロックを併用するという二重管理になり、オーバーヘッドが増えてしまう。マルチレコードトランザクションの時だけレコードロックをかけるという案もあったが、説明が煩雑になるので避けた。トランザクションに関わる複数のレコードがたまたま同じページにあると並列処理性能が下がることにはなるが、しかし同じページにあるということはキャッシュ効果で直列処理性能が上がるので、ページロックであることの全体のスループットへの影響は軽微だろうと考えられる。

オンメモリのハッシュデータベース(TinyDBM)とオンメモリのツリーデータベース(BabyDBM)でも、それぞのファイル版と全く同じ構造で排他制御を行っている。よって、ファイル版と全く同じように、それぞれレコードスロットとページスロットをロックすることで、マルチレコードのアトミック性を確保することができる。オンメモリデータベースの場合はロックのオーバーヘッドのスループットに対する影響が大きいので、データ構造を保護するためのロックをトランザクションのアトミック性のためのロックとして共用できたことは意義深い。


マルチレコードトランザクションを応用して、マルチレコードのCAS(Compare-and-Swap)を実装することもできる。これを利用すると、いわゆるロングトランザクションが実現できる。例えば、上述の振り込み処理をCASで実現することを考える。トランザクション外で、口座Aの値を調べ、それが10000であることを確認する。同様に口座Bの値が5000であることを確認する。その後、「口座Aの値が10000であり、かつ口座Bの値が5000であるならば、口座Aの値を9000にして、かつ口座Bの値を6000にする」というトランザクションが発行できる。事前の調査とトランザクションの発行の間に別の更新処理が走っていたならばそのトランザクションは失敗するが、その場合は事前調査からまたやり直して、成功するか、断念する条件に当てはまるまで、繰り返せばよい。

ロングトランザクションの要点は、事前調査と更新適用の間にロックが維持されているわけではないので、ページ遷移を挟んだり、外部リソースの承認処理を入れたりといった、時間のかかる処理を挟むことができることだ。更新適用の際には、CASの能力で、関連するレコードの値が事前調査した時と整合するかを確認した上で、整合する場合にのみ実際の更新を行えばよい。CASさえあれば、どんなに複雑で時間のかかるトランザクションも、外部のロックなしに実装できる。

実際、マルチレコードCASをDBMクラスのユーティリティメソッドCompareExchangeMultiとして実装しておいた。事前条件となるレコードのキーと値のリストと、事前条件の適合時に設定するレコードのキーと値のリストを渡す。戻り値は、成功時にはSUCCESSで、失敗時にはINFEASIBLE_ERRORだ。これを使って、上述の振り込み処理を実装してみる。

Status TransferByCompareExchange(
    DBM* dbm, std::string_view src_key, std::string_view dest_key, int64_t amount) {
  // 処理が成功するか、実行不能になるまでは、トランザクションを試みる。
  constexpr int32_t max_tries = 100;
  for (int32_t num_tries = 0; num_tries < max_tries; num_tries++) {

    // 振込元の預金額を調べる
    std::string src_value;
    Status status = dbm->Get(src_key, &src_value);
    if (!status.IsOK()) {
      return status;
    }
    const int64_t src_balance = StrToInt(src_value);
    if (src_balance < amount) {
      return Status(Status::INFEASIBLE_ERROR, "insufficient balance");
    }

    // 振込先の預金額を調べる
    std::string dest_value;
    status = dbm->Get(dest_key, &dest_value);
    if (!status.IsOK()) {
      return status;
    }
    const int64_t dest_balance = StrToInt(dest_value);

    // 振り込み後の預金額に基づいた新しい値を作る
    const std::string src_new_value = ToString(src_balance - amount);
    const std::string dest_new_value = ToString(dest_balance + amount);
    
    // 両方のアカウントの値が検査時と変わっていない場合だけ、更新を行う
    const std::vector<std::pair<std::string_view, std::string_view>> expected =
        {{src_key, src_value}, {dest_key, dest_value}};
    const std::vector<std::pair<std::string_view, std::string_view>> desired =
        {{src_key, src_new_value}, {dest_key, dest_new_value}};
    status = dbm->CompareExchangeMulti(expected, desired);
    if (status.IsOK()) {
      return Status(Status::SUCCESS);
    }
    if (status != Status::INFEASIBLE_ERROR) {
      return status;
    }
  }

  return Status(Status::INFEASIBLE_ERROR, "too busy");
}

CompareExchangeMultiに渡す事前条件が満たされなかった場合というのは、他のスレッドがどちらかのアカウントを更新したことを意味するので、その場合は事前に調べた預金額とそれに基づいて作られた新しい預金額の妥当性はない。その場合、現在の預金額を調べるところからやり直せば良い。

ロールバック機能なんて正常系には要らない。Kyoto Cabinetに正常系のロールバック機能を入れたのは今でも後悔している。CASだけあれば、何もかもがAtomicでConsistentでIsolatedになるのだ。あとはDurabilityだけが問題になる。上述したように、障害時のロールバックには追記更新モードが備えるタイムマシン機能を使えばよいので、ソフトウェアレベルでのACID特性は満たされたと主張できる。


まとめ。Trkzwのデータベースでマルチレコードトランザクションが使えるようになった。さすがに銀行口座の管理にDBMを使う奴はいないし、そもそも異なる銀行の異なるシステム間で共通したロックを管理するのが不可能だ。しかし、ゲームサーバの中でエージェント同士がポイントやアイテムを授受するなど、複数レコードが関わるトランザクションを実装したい場合は多いだろう。アプリケーションに排他制御の責任を持たせるとバグの温床になるので、データベースライブラリの支援があるのは便利だ。

ProcessMultiメソッドの中では任意のコールバック関数が実行できるので、どんなに複雑な処理でもアトミックに実現できる。ただし、実行に時間がかかりそうな処理をロックの中でやると並列処理性能が劣化するので、その場合はCompareExchangeMultiメソッドを使って事前検査とCAS処理をループさせる方針にすべきだ。