豪鬼メモ

一瞬千撃

DBサービスを作ろう その9 更新ログのメッセージキュー化

前回の記事で、レプリケーション機能の実装を見越して、データベースの更新ログ機能を設計した。実装を進めながら思ったのは、データベースに更新があった旨をログとして記録して、またそれを外部に通知するという動作は、メッセージキューと同じ要件を持つということだ。よって、汎用メッセージキューを実装して、それを更新ログに流用するように設計を見直す。その方が機能性も上がるし、テストもしやすい。


データベースの更新ログは、イベントログの一種とみなせる。いつどんな更新があったというイベント情報のシーケンスを、タイムスタンプ付きで記録していくのだ。つまり、ものすごく抽象化すると、以下のようなWriteメソッドだけを提供すればよい。タイムスタンプはミリ秒単位で表現することにして、int64_tで表現する。

// タイムスタンプ付きのメッセージをログに記録する。
// 結果の成否をステータスコードとして返す。
Status Write(int64_t timestamp, std::string_view message);

Writeに渡されたタイムスタンプと任意のバイナリメッセージは、適当なシリアライズをした上で、ファイルに追記される。ログを利用するスレッドは、タイムスタンプを使って読み出し開始位置を指定した上で、そこからイベントを一つずつ読み出していく。

// 次のメッセージとそのタイムスタンプを読み出す。
// 既に末尾まで呼んでいる場合、新規の書き込みをタイムアウトまで待つ。
// 結果の成否をステータスコードとして返す。
Status Read(double timeout, int64_t* timestamp, std::string* message);

ここで重要なのは、既存のイベントを連続して読み出せるようにするとともに、最新に追いついたならば、新しい更新を待ってから、それを読み込む機能である。言い換えるとログの更新をモニタリングして即時に読み込む機能である。これがあることで、単なるロガーではなく、メッセージキューとして機能することになる。

キューと言いながら、読み出したメッセージが即座に消えるわけではない。データベースの更新ログとしてこれを用いる場合、簡単に消えてもらっては困る。同じメッセージを重複して読まないように管理するのは読み出し側の責任である。とはいえ、前回述べたように、更新ログには冪等性を持たせているので、重複して適用しても、時間がより多くかかるだけで、結果は同じになる。よって、それなりのマージンを持たせて重複して適用するのが実運用上は望ましいだろう。タイムスタンプを使えばその実現は容易だ。レプリケーションで利用することを考えると、マルチスレーブ構成も想定すべきだ。となると、読み出しを行うスレッドが複数同時に存在することを前提とし、それを効率的に捌けるようにすべきだ。

単一のメッセージキューに複数のリーダーという構成だから、以下のようになクラス構成になるかな。実際のAPItkrzw_message_queue.hにあるのでご覧頂きたい。

class MessageQueue {
 public:
  // リーダオブジェクトのクラス。
  class Reader {
   public:
     Status Read(double timeout, int64_t* timestamp, std::string* message);
  };

  Status Write(int64_t timestamp, std::string_view message);

  // 指定したタイムスタンプ以降のメッセージを読み込むリーダを作成する。
  std::unique_ptr<Reader> MakeReader(int64_t min_timestamp);
};

もちろん、パスを指定してファイルを開くOpenメソッドや、ファイルを閉じるCloseメソッドなども必要となる。全てのメッセージを単一のファイルに追記していくと、ファイルが巨大になりすぎてしまうので、ファイルは適当なサイズで分割される。ファイル名には連番の接尾辞をつける。よって、Openで指定するのはメッセージファイルの接頭辞と、各ファイルの最大サイズとなる。

// 書き込みのためにファイルを開く。
Status Open(const std::string& prefix, int64_t max_file_size);

ファイルが指定したサイズで分割されるというのが、この実装の最大の特徴だ。単一のファイルに追記していく方法だと、古い情報を消せないので、実運用上で使いづらいものになってしまう。ファイルが適宜分割されたなら、古いファイルを定期的に消すことで、ストレージの容量を一定に保てる。用済みになった古いファイルは、運用中でも、rmコマンドとかの適当な手段で消してよい。用済みかどうかをどのように判断するのかは、メッセージキュー自身は知らない。

ローカルの更新ログとしては、データベースのバックアップファイルが作成できたら、そのバックアップファイルよりも古い更新ログは不要になる。レプリケーション用の更新ログとしては、全てのスレーブの中で最も追従が遅いものよりも古い更新ログは不要になる。スレーブを追加する際には、バックアップファイルを転送してから、そのタイムスタンプを起点としてレプリケーションを始めることだろう。となると、実運用では、定期的にバックアップを取った後に、古い更新ログの削除も行うことだろう。ともかく、バックアップを取るなり、レプリケーションの状態を確認するなりしたら、あるタイムスタンプより前の更新情報は不要であると判断できる。そのタイムスタンプより前の情報しか含まないメッセージファイルを消すという関数があると便利そうだ。

static Status RemoveOldFiles(const std::string& prefix, int64_t threshold);

ファイルベースのメッセージキューの実装で最も面倒なのは、ファイル分割にまつわる問題である。問題を単純にするために、ファイル名には厳格な制約を設ける。共通の接頭辞に10桁の連番の接尾辞をつけたもののみをメッセージファイルとみなす。"/var/log/hoge" が接頭辞であれば、"/var/log/hoge.0000000001" などである。ファイル名の辞書順とタイムスタンプの順番は整合していなければならないものとする。

リーダオブジェクトを作成する際には、読み出しの開始位置となるタイムスタンプを指定する。全てのファイルの内容を先頭から末尾まで読めば開始地点を確実に決められるが、それでは遅い。よって、ファイル名のID接尾辞を基準にして、読み飛ばしを行う。ここで注意すべきは、ファイル名のID接尾辞は、そのファイルを生成した順番に過ぎないということだ。そのファイルを読み飛ばして良いかどうかは、そのファイルに含まれる最も新しいメッセージのタイムスタンプが、指定されたものよりも古いかどうかで決まる。各ファイルの最後のレコードを読むには、そのファイル全体を読み込む必要があるが、それだと効率が悪い。よって、ファイルのヘッダに最後のレコードのタイムスタンプを持たせることにする。そうすると効率的にファイルを検索できる。

メッセージを書き込むライタオブジェクトと、書き込まれたメッセージを読み込むリーダオブジェクトの間の効率的なデータのやりとりを実現するのがメッセージキューの役目だ。また、リーダもライタもいつ死んだりキャンセルされるかわからないので、メッセージはファイルに記録する必要がある。ファイルシステムのバッファを介してリーダとライタの間でデータを共有するのは、実装依存になるので危険だ。よって、プロセス内でリーダとライタが同一のファイルオブジェクトを参照することで、整合性を維持しつつ、効率的なデータの授受を可能にすべきだ。それでいて、ファイルは適当な単位で分割されるので、スレッド間で共有されるオブジェクトをどうやって管理するかが難しい問題となる。std::shared_ptrを使って、各ファイルを開いているリーダオブジェクトとライタオブジェクトの数を数えることで、なんとか実装はできた。

リーダオブジェクトがファイルからメッセージを読んでいって、ファイルの末尾に到達した時、それがログ全体の最新のファイルであれば、ライタが次のメッセージを書き込むまで待機すべきだ。そうでない場合、次のファイルを開き直して読み込みを続けるべきだ。ファイルの末尾かどうかは、現在のオフセットとファイルサイズを比較すればわかる。疑似コードで書くとこんな感じの手順になる。

// キャンセルされるまでループ
while (!IsCanceled()) {
  // 現在位置を調べる
  if (PotionIsEndOfFile()) {
    // 現在位置がファイル末尾の場合
    if (IsLatestFile()) {
      // ライタに問い合わせて最新ファイルであれば、
      // 条件変数を使って更新を待つ
      if (!WaitForUpdate(timeout)) {
        // キャンセルされたら抜ける
        return false;
      }
    } else {
      // 最新ファイルでないなら、
      // 次のファイルを開き直す。
      OpenNextFile();
    }
  } else {
    // 現在位置がファイル末尾じゃないなら、メッセージを読み込む。
    ReadNextMessage(result);
    // タイムスタンプが適格であれば、結果を返す。
    if (result->timestamp >= min_timestamp_) {
      return true;
    }
  }
}

上記のループが設計できるかどうかが鍵だ。その他の実装は刺し身タンポポみたいなもんで、手を動かせばいいだけの話だ。それにしても、更新ログを取るだけでこんなに面倒くさいとは。昔の自分はよくレプリケーションなんて自分で実装したもんだと感慨深くなる。

レースコンディションについては気をつけねばならない。リーダ側では、上記のループの先頭でロックを確保し、末尾で解放する。リーダがWaitForUpdateで更新待ちに入った状態では、条件変数がロックを一時的に解放してくれるので、その間にライタ側はログの書き込みを行う。やけにリーダ側に有利な条件だが、これでよい。そもそもリーダよりライタの方が早いと、消せないログがどんどん蓄積してしまうので、運用できない。ライタが遅いのも困るが、ライタだけ早くても結局は破綻するので、やはりリーダ優先ポリシーが現実的だ。ライタ側で書き込みをバッファリングする最適化も考えられるが、更新ログがバッファ上にだけ存在するのはリスクが高いので、慎重に判断すべきだ。

性能評価を一応してみた、単一プロセスの中で、メッセージをひたすら書き込むスレッドと、それをひたすら読み込むスレッドを作る。私のノートPC上で、100万件の書き込みとその全ての読み込みを同時に行うのに、0.5秒ほどかかった。mutexと条件変数を使ったイベント駆動の処理である割には、そこそこ早いと言えるだろう。とはいえ、200万QpS以上のスループットを持つシステムで使うと、ボトルネックになり得る。Tkrzwはライブラリ単体では500万QPSとか出してくるので、ログの方が遅いという事態は普通に起こる。しかし、RPCを介したデータベースサーバのレプリケーションに使うならば、200万QPSというスループットが問題になることはないだろう。


まとめ。データベースの更新ログを実装していたら、同じ機能がメッセージキューとして使えることに気づいたので、コンポーネントとして切り出してみた。Tkrzw本体のライブラリに組み込まれているので、データベースの更新をモニタリングして何らかの処理を行うプログラムが簡単に書けるようになる。次回はこれを使って実際にレプリケーションを実現する。