豪鬼メモ

一瞬千撃

DBサービスを作ろう その12 非同期レプリケーションのプロトコルと実装

データベースサーバの非同期レプリケーション機能を実装したが、そのプロトコルと実装の詳細について述べつつ、妥当性について検証しよう。非常に細かくて重箱をつつく話になるが、神は委細に宿るとも言うし、ないがしろにできないことばかりだ。
f:id:fridaynight:20210930205558p:plain

プロトコルと更新ログのフォーマット

データベースの更新ログを即時に他のサーバに転送することで、ハードウェア障害に耐えるシステムを実現するのが、レプリケーション機能である。ここでは、更新が発生した元のデータベースサーバを「マスタ」と呼び、その更新ログを受け取ってマスタに追従しようとするデータベースサーバを「スレーブ」と呼ぶ。スレーブのデータベースはマスタのデータベースのホットバックアップとみなすことができる。すなわち、マスタのマシンが突然故障しても、スレーブのデータベースが生きていれば、データは(ほとんど)失われないし、(ほとんど)無停止で運用を継続することができる。

前回も述べたが、Tkrzw-RPCでは非同期レプリケーションを実装している。マスタはスレーブの存在など気にせずに更新処理を行い、スレーブが自分の責任でマスタから更新を吸い出す。スレーブは、マスタのどの時点の更新までに追従しているかをタイムスタンプを保持して管理する。一方で、マスタはスレーブの状況について責任を持たない。「指定されたタイムスタンプ以降の更新ログを寄越せ」というリクエストに応答して、該当の更新ログのストリームを出力するだけだ。これはgRPCの文脈ではサーバサイドストリームとして表現される。具体的には以下のようなプロトコルになる。

// Replicateメソッドのリクエスト
message ReplicateRequest {
  // レプリケーションを開始するタイムスタンプ
  int64 min_timestamp = 1;
  // クライアントのサーバID
  int32 server_id = 2;
  // 更新ログがない場合に次の更新ログを待つ秒数
  double wait_time = 3;
}

// Replicateメソッドのレスポンス
// 最初のレスポンスはOP_NOOPで、自分のサーバIDを通知する
message ReplicateResponse {
  enum OpType {
    // 処理を行わない。タイムスタンプだけの通知に使う
    OP_NOOP = 0;
    // レコードを設定する
    OP_SET = 1;
    // レコードを削除する
    OP_REMOVE = 2;
    // 全てのレコードを削除する
    OP_CLEAR = 3;
  }
  // ステータスコードとメッセージ
  StatusProto status = 1;
  // その更新のタイムスタンプ
  int64 timestamp = 2;
  // その更新を引き起こしたサーバID
  int32 server_id = 3;
  // その更新の対象となるDBMのインデックス
  int32 dbm_index = 4;
  // その更新の種類
  OpType op_type = 5;
  // その更新のレコードのキー
  bytes key = 6;
  // その更新のレコードの値
  bytes value = 7;
}

service DBMService {
  // リクエストは単体で、レスポンスはストリーム
  rpc Replicate(ReplicateRequest) returns (stream ReplicateResponse);
}

レスポンスの最初には、自分(=マスタ)のサーバIDを伝達する。スレーブはマスタのアドレスは知っているがマスタのIDは知らなくてもよいことになっているので、ここでマスタのサーバIDを教えることが必要だ。サーバIDは、スレーブ側の更新ログに記録される。ここで、サーバ1で起こった更新SET("foo", "bar")が、サーバ2に伝搬して、さらにサーバ3に伝搬されることを考えよう。それぞれに記録される更新ログは以下のようになる。

  • サーバ1 : timestamp=12345, server_id=1, dbm_index=0, op=SET, key="foo", value="bar"
  • サーバ2 : timestamp=12346, server_id=1, dbm_index=0, op=SET, key="foo", value="bar"
  • サーバ3 : timestamp=12347, server_id=2, dbm_index=0, op=SET, key="foo", value="bar"

更新ログに記録されるタイムスタンプは、そのマシンの時間軸で表現される。よって、同一の更新のタイムスタンプが、マスタとスレーブで異なるのは当然であり、スレーブの時計がずれていればスレーブのタイムスタンプの方が小さくなることすらあるが、それは問題ない。スレーブ側に記録されるタイムスタンプファイルの値がマスタ側の時系列で生成されてさえいれば、不整合は起こりえない。そして、更新ログの中のサーバIDには、自分が起こした更新なら自分のIDを、マスタから受け取った更新指示による場合はマスタのIDを採用する。よって、サーバ2の更新ログにはサーバ1のIDが記録され、サーバ3の更新ログにはサーバ2のIDが記録される。

こうしておくと、デュアルマスタ構成でもログが循環せずに済む。例えば、サーバ2のマスタとしてサーバ3を指定した場合、サーバ3の更新ログをサーバ2に送ろうとするが、そこでサーバIDが2のものは捨てられるので、循環が未然に防げる。サーバIDは経路のトレース情報とみなせるが、直前のノードしか記録していないので、2ホップ以上の循環は検出できない。よって、三角貿易のように、サーバ1→サーバ2→サーバ3→サーバ1というトリプルマスタトポロジを組んだら、無限ループが始まる。経路情報を全て記録したり、TTLを持たせるなどすれば任意のホップ数の循環を抑止できるが、今のところ実装していない。トリプルマスタにするくらいなら、デュアルマスタのアクティブマスタにスレーブを足す方が現実的だ。

マスタが死んで、新たなマスタを立てたことを想定しよう。ある更新のログは、マスタに記録されてから、スレーブに伝搬して、そこで記録される。よって、基本的には、同じ更新のタイムスタンプは、マスタの方がスレーブより小さい。ところで、スレーブはマスタの更新のどこまで読み出しているかのタイムスタンプを持っている。レプリケーションのマスタを別のスレーブに切り替えた場合、かつてのマスタのタイムスタンプは、全体的に新しい方向にシフトしている新しいマスタの時間軸においては、相対的には小さく(古く)なる。つまり、同じタイムスタンプを使う場合、同じ更新が2回読まれることになる。冪等性により重複適用は問題ないのだが、新たなマスタのタイムスタンプが過去方向にずれている場合には、読みそこねる更新が発生しうるので、問題だ。よって、レプリケーション開始時に開始点を過去方向にずらす機能がある。

更新ログにdbm_indexが記録されているが、これは単一のサーバが複数のデータベースを扱うためだ。レプリケーションを行う場合、マスタとスレーブのデータベースの数は同じであることを前提とする。数が合わない場合の溢れた更新ログは無視されるだけだが、おそらく望ましい結果にはならないだろう。数さえあっていれば型が違っていても大丈夫なのが面白いところで、CacheDBMをHashDBMにレプリケーションしたり、TreeDBM同士で圧縮の有無を変えたりすることもできる。

スレーブがマスタに完全に追従できている状態では、新たな更新があるまで処理が待ち状態になる。実際には、リクエストで指定した最大時間まで待ち、それでも更新が来なければ、マスタ側のタイムスタンプのみを返す。最大時間のデフォルトは1秒である。つまり、1秒に1回必ずデータが送られる。スレーブ側のタイムスタンプは更新ログを受け取った時に更新され、もし接続が切断された場合には、そのタイムスタンプの場所から再開する。ということは、最後の更新ログは2回適用される。同一タイムスタンプの更新が複数あることを考えると、この挙動は必須だ。ただし、更新ログが来ないとタイムスタンプが更新されないと、マスタの更新がない場合、スレーブが追従していないように見えてしまう。タイムアウト時に現在時刻のタイムスタンプを送信すれば、スレーブのタイムスタンプは現在時刻付近に更新されるし、最後の更新が重複して適用されることもない。

いずれにせよ、1秒なりの時間、マスタとスレーブ(つまりサーバとクライアント)のセッションを維持したまま、処理がブロックするというのが、レプリケーションプロトコルの最大の特徴である。スレーブ側ではレプリケーション専用のスレッドを割り当てるので、スレッドをブロックさせて待てば良い。マスタ側では、同期APIの場合、専用のスレッドを割り当ててブロックさせて待てばよい。非同期APIの場合、スレッドがブロックすると同じメッセージキューの他のセッションが全てブロックしてしまうので、ノンブロッキングで処理を行わねばならない。ここが最も厄介な点であり、それについては後ほど詳述する。

マスタの同期APIの実装

非同期レプリケーションのマスタが備えるべきReplicateメソッドの実装を見ていこう。上述のプロトコルバッファによるサービス定義をコンパイルすると、同期API用と非同期API用にインターフェイスが生成される。まずは同期APIから実装しよう。protocによって生成されたファイルtkrzw_rpc.grpc.hには、DBMService::Serviceという抽象クラスが定義され、それはReplicateという純粋仮想関数を持つので、それを継承してオーバライドするクラスを実装する。ここでは、実装用の別のクラスを他重継承して、そのReplicateImpl関数に処理を委譲する。

class DBMServiceImpl : public DBMServiceBase, public DBMService::Service {
 public:
  grpc::Status Replicate(
      grpc::ServerContext* context, const tkrzw::ReplicateRequest* request,
      grpc::ServerWriter<tkrzw::ReplicateResponse>* writer) override {
    return ReplicateImpl(context, request, writer);
  }
  ...
};

DBMServiceBaseクラスを切り出した理由は同期APIと非同期APIで実装を共有するためだ。RplicateImpl関数では、接続がキャンセルされない限りループし続けて、クライアントに更新ログを送り続ける。

public DBMServiceBase {
 public:
  grpc::Status ReplicateImpl(
      grpc::ServerContext* context, const tkrzw::ReplicateRequest* request,
      grpc::ServerWriter<tkrzw::ReplicateResponse>* writer) {
    // 次の更新をTkrzwのメッセージキューから抜き出すためのリーダ
    std::unique_ptr<MessageQueue::Reader> reader;
    // 無限ループ
    while (true) {
      // キャンセルされたら抜ける
      if (context->IsCancelled()) {
        return grpc::Status(grpc::StatusCode::CANCELLED, "cancelled");
      }
      // リーダから次の更新ログを読み込む
      tkrzw::ReplicateResponse response;
      const grpc::Status status = ReplicateProcessOne(
          &reader, context, *request, &response);
      if (!status.ok()) {
        return status;
      }
      // 更新ログをクライアントに送信する
      if (!writer->Write(response)) {
        break;
      }
    }
    return grpc::Status::OK;
  }
  ...
};

次の更新ログを取り出すReplicteProcessOneが最も面白いところだ。同じくDBMServiceBaseクラスの中で定義される。TkrzwのデータベースはTkrzwのメッセージキュークラスを使って更新ログを書き出すが、そのクラスはモニタ機能付きのリーダ機能も備える。一定時間以内に次の更新が読み込めればそれを返し、そうでなければタイムアウトした旨と現在のタイムスタンプだけを返す。

public DBMServiceBase {
 public:
  grpc::Status ReplicateProcessOne(
      std::unique_ptr<MessageQueue::Reader>* reader, grpc::ServerContext* context,
      const tkrzw::ReplicateRequest& request, tkrzw::ReplicateResponse* response) {
    // メッセージキューのリーダが存在しなければ作る。所有権は呼び出し側が持つ
    if (*reader == nullptr) {
      LogRequest(context, "Replicate", &request);
      if (mq_ == nullptr) {
        return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, "disabled update logging");
      }
      if (request.server_id() == server_id_) {
        return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "self server ID");
      }
      *reader = mq_->MakeReader(request.min_timestamp());
      // 最初のレスポンスでは、サーバIDを教えて、何もしないという指示をする
      response->set_op_type(ReplicateResponse::OP_NOOP);
      response->set_server_id(server_id_);
      return grpc::Status::OK;
    }
    int64_t timestamp = 0;
    std::string message;
    double wait_time = request.wait_time();
    // 無限ループで、読み込んだ更新ログを出力していく
    while (true) {
      // キャンセルされたら抜ける
      if (context->IsCancelled()) {
        return grpc::Status(grpc::StatusCode::CANCELLED, "cancelled");
      }
      // 待ち時間がゼロの場合、おそらくタイムアウトするので、その際に現在時刻を出力に反映
      // させるべく、メッセージキューに取り込んでおく
      if (wait_time <= 0) {
        mq_->UpdateTimestamp(-1);
      }
      // 次の更新ログを読み込む      
      Status status = (*reader)->Read(&timestamp, &message, wait_time);
      if (status == Status::SUCCESS) {
        // 取得したメッセージのタイムスタンプを出力に設定
        response->set_timestamp(timestamp);
        // メッセージをデシリアライズして更新ログを組み立てる
        DBMUpdateLoggerMQ::UpdateLog op;
        status = DBMUpdateLoggerMQ::ParseUpdateLog(message, &op);
        if (status == Status::SUCCESS) {
          // クライアント(スレーブ)のサーバIDと同じサーバIDの更新ログは無視する
          // ここがデュアルマスタでの循環を抑止する
          if (op.server_id == request.server_id()) {
            continue;
          }
          // 更新内容を出力に設定
          switch (op.op_type) {
            case DBMUpdateLoggerMQ::OP_SET:
              response->set_op_type(ReplicateResponse::OP_SET);
              break;
            case DBMUpdateLoggerMQ::OP_REMOVE:
              response->set_op_type(ReplicateResponse::OP_REMOVE);
              break;
            case DBMUpdateLoggerMQ::OP_CLEAR:
              response->set_op_type(ReplicateResponse::OP_CLEAR);
              break;
            default:
              break;
          }
          response->set_server_id(op.server_id);
          response->set_dbm_index(op.dbm_index);
          response->set_key(op.key.data(), op.key.size());
          response->set_value(op.value.data(), op.value.size());
        }
      } else if (status == Status::INFEASIBLE_ERROR) {
        // タイムアウトした場合はここに来る
        if (wait_time > 0) {
          // 待ち時間を指定したのにタイムアウトした直後のループでは、メッセージキューの
          // タイムスタンプを現在時刻に更新して、もう一度だけノンブロッキングのメッセージ
          // 取得を試みる
          wait_time = 0;
          continue;
        }
        // ここに来たということは現在時刻に更新したタイムスタンプを得ている
        // その値はどの更新ログよりも新しいか最新と同じことが保証される。
        response->set_timestamp(timestamp);
      }
      // ステータスコードを設定する
      response->mutable_status()->set_code(status.GetCode());
      response->mutable_status()->set_message(status.GetMessage());
      break;
    }
    return grpc::Status::OK;
  }
  ...
};

タイムアウトした場合には、メッセージキューが管理するタイムスタンプを現在時刻に更新しつつ、その値を返す。地味にここが重要だ。現在時刻はNTPや閏秒などによる調整で戻ることがあるため、それをそのままタイムスタンプにするわけにはいかない。メッセージキューは、現在時刻を監視しつつ、決して戻らないタイムスタンプを生成する機能を持っている。新しい更新ログが来たりUpdateTimestampメソッドで指示された場合には現在時刻を見てミリ秒単位のタイムスタンプを発行するが、それが前に発行したものよりも古い場合には、前に発行したものと同じものを返す。

現在時刻を出力のタイムスタンプに反映しておくと、それはスレーブ側のタイムスタンプを最新のものに置き換える効果を持つので、最後の更新ログが重複して読み込まれる可能性をほぼゼロにしてくれる。重複して読んでも冪等性により問題はないのだが、性能を考えると重複は少ない方がよい。タイムアウトしているということは更新がほとんどないということであり、おそらくCPUが暇なので、こういう贅沢な処理をしても問題ない。

マスタの非同期APIの実装

次に、非同期APIの実装を見ていく。以前にも述べたが、非同期APIの方が性能が良いので非同期APIだけを実装すれば良いのだが、テストや性能比較のために両方実装している。同期APIとの大きな違いは、スレッドを専有しながら一連のストリームの処理を処理することができず、gRPCのメッセージキューを使ったイベント駆動の処理を行わねばならないことである。そこで扱われるイベントとは、セッションの状態変化であり、読み書きが可能になった状態と、セッションの切断状態への変化である。よって、それに応じたインターフェイスAsyncDBMProcessorInterfaceを定義する。これを実装したクラスのオブジェクトをメッセージキューに突っ込むことで、様々なRPCメソッドを同一のキューで駆動できる。

リクエストが到着したならProceedメソッドを呼び、セッション切断されたならCancelメソッドを呼ぶ。セッションが切断された場合、それがサーバ側のシャットダウンによるものなのか、クライアントが接続を切断したのかを区別する必要があるので、is_shutdownというパラメータを持たせている。ところで、コンストラクタがパラメータはカウンタを受け取る。これはコンストラクタでインクリメントされ、デストラクタでデクリメントされる。これによって生存中のオブジェクトの数が常に把握できる。この値はアクティブなメソッド呼び出しの数と一致するので、Inspectメソッドで報告して負荷の判断の一助にしてもらう。また、何もしていない時にこの値が0になるのを確認すれば、メッセージキュー関連でメモリリークしていないことが確認できる。特に、後述する実装は複雑な構成になるので、この確認は重要だ。

class AsyncDBMProcessorInterface {
 public:
  explicit AsyncDBMProcessorInterface(std::atomic_int32_t* num_active_calls)
      : sc_(num_active_calls) {}
  virtual ~AsyncDBMProcessorInterface() = default;
  virtual void Proceed() = 0;
  virtual void Cancel(bool is_shutdown) = 0;
  
 private:
  ScopedCounter<std::atomic_int32_t> sc_;
};

Replicateメソッド用の非同期処理クラスの実装を見ていこう。この実装はかなり苦労したので、詳細に説明する。まず前提条件として、非同期APIはgRPCのメッセージキューを読み出しながらイベント駆動で処理を進めるのだが、レプリケーションデータの送信はデータベースに接続されたTkrzwのメッセージキューを読み出しながらイベント駆動で処理を進めなければならない。よって、動力源が二つある機械を制御するという厄介な問題に取り組むことになる。いろいろ試して行き着いた結論としては、Tkrzwのメッセージキューの監視をバックグラウンドのスレッドで行わせつつ、その状態変化をアラームの「キャンセル」としてgRPCのキューに通知するスタイルが最善ということだ。待ち状態に入った時にアラームを作り、読み込めるようになったらアラームをキャンセルするっていう発想に至るまでが大変だった。更新ログの読み出しはあくまでフォアグラウンドのスレッドが行い、バックグラウンドのスレッドは読み出しが可能である旨を通知する機能に特化させる。

実際のコードを見てみよう、サーバストリームなので、CREATE(初期化処理)、BEGIN(最初の書き込み)、WRITING(書き込み中)、WAITING(データベースの更新待ち)、FINISH(終了処理)という状態を持つステートマシンとして実装する。

class AsyncDBMProcessorReplicate : public AsyncDBMProcessorInterface {
 public:
  enum ProcState {CREATE, BEGIN, WRITING, WAITING, FINISH};

  AsyncDBMProcessorReplicate(
      DBMAsyncServiceImpl* service, grpc::ServerCompletionQueue* queue)
      : AsyncDBMProcessorInterface(&service->num_active_calls_),
        service_(service), queue_(queue),
        context_(), stream_(&context_), proc_state_(CREATE),
        reader_(nullptr), rpc_status_(grpc::Status::OK),
        deadline_(std::chrono::system_clock::now()),
        alarm_(), wait_time_(0), bg_thread_(), alive_(true), mutex_() {
    // 作られた直後に初期化処理を行う
    Proceed();
  }

  ~AsyncDBMProcessorReplicate() {
    // デストラクタでは、バックグラウンドのスレッドを停止させる
    alive_.store(false);
    cond_.notify_one();
    if (bg_thread_.joinable()) {
      bg_thread_.join();
    }
  }

  void MonitorQueue() {
    // バックグラウンドのスレッドの処理。デストラクタが呼ばれるまで無限ループして、
    // ブロッキング
    while (alive_.load()) {
      if (proc_state_ == WAITING) {
        // データベース更新がない場合にここに来る。
        // 最大1秒だけブロッキングして、次の更新を読み込む
        Status status(Status::SUCCESS);
        {
          std::lock_guard<std::mutex> lock(mutex_);
          status = reader_->Wait(1.0);
        }
        // 更新が読み込めたなら、アラームをキャンセルしてそれを通知する
        if (alive_.load() && status == Status::SUCCESS) {
          alarm_.Cancel();
        }
      }
      // 更新の転送中にビジーループするのを避けるために、スリープ
      std::unique_lock<std::mutex> lock(mutex_);
      cond_.wait_for(lock, std::chrono::milliseconds(50));
    }
  }

  void Proceed() override {
    if (proc_state_ == CREATE) {
      // IsCancelを後で呼ぶために必要。
      context_.grpc::ServerContext::AsyncNotifyWhenDone(nullptr);
      // リクエストの読み込みを開始して、その終了後にキューに自分を入れる
      proc_state_ = BEGIN;
      service_->RequestReplicate(&context_, &request_, &stream_, queue_, queue_, this);
    } else if (proc_state_ == BEGIN || proc_state_ == WRITING || proc_state_ == WAITING) {
      // リクエストが読み込めたらここに来る
      if (proc_state_ == BEGIN) {
        // 初回の呼び出しでは、自分の子を産卵して、次のリクエストに備える
        new AsyncDBMProcessorReplicate(service_, queue_);
        // リクエストに指定された待ち時間を待避させつつ、ゼロに置き換える
        wait_time_ = request_.wait_time() < 0 ? INT32MAX : request_.wait_time();
        request_.set_wait_time(0);
      }
      // 同期APIと同じルーチンで、次の更新を読み込む。
      // 待ち時間はセロなので、ノンブロッキングの処理になる。
      response_.Clear();
      {
        std::lock_guard<std::mutex> lock(mutex_);
        rpc_status_ = service_->ReplicateProcessOne(&reader_, &context_, request_, &response_);
      }
      // 初回の呼び出しでは、バックグラウンドスレッドを起動する
      // ReplicateProcessOneでreaderが生成されたのを待ってからスレッドを起動する必要がある
      if (proc_state_ == BEGIN) {
        bg_thread_ = std::thread([&]{ MonitorQueue(); });
      }
      if (rpc_status_.ok()) {
        if (response_.status().code() == tkrzw::Status::INFEASIBLE_ERROR &&
            proc_state_ == WRITING &&
            std::chrono::system_clock::now() <= deadline_) {
          // 更新ログが読み込めなかった場合で、かつ前回既に出力を行っている場合は、
          // 締め切り時間まで待ち状態に入る。
          // 更新があった場合にはキャンセルされて、キャンセルイベントがコールバック
          // されることで読み込みが行われる
          proc_state_ = WAITING;
          alarm_.Set(queue_, deadline_, this);
          cond_.notify_one();
        } else {
          // 更新ログが読み込めた場合か、前回の呼び出しで待ちを行った場合はここに来る
          // 締め切り時間を次回のために更新して、更新ログか空のタイムスタンプを送信
          proc_state_ = WRITING;
          deadline_ = std::chrono::system_clock::now() + std::chrono::milliseconds(
              std::max<int64_t>(1, wait_time_ * 1000));
          stream_.Write(response_, this);
        }
      } else {
        // エラーならセッションを切って終了状態に移行
        proc_state_ = FINISH;;
        stream_.Finish(rpc_status_, this);
      }
    } else {
     // 終了状態なら死ぬ
      delete this;
    }
  }

  void Cancel(bool is_shutdown) override {
    // セッションが切断されるか、アラームがキャンセルされるとここに来る
    if (is_shutdown) {
      // サーバがシャットダウンする場合は、死ぬ
      delete this;
    } else if (proc_state_ == WAITING) {
      // 待ち状態でキャンセルされたということなので、更新を読み込べく処理を継続
      Proceed();
    } else if (proc_state_ == WRITING) {
      // 書き込みしようとしてキャンセルされたので、セッションを終了する
      proc_state_ = FINISH;;
      stream_.Finish(rpc_status_, this);
    } else {
      // セッションが終了した状態でキャンセルしたので、死ぬ
      delete this;
    }
  }

 private:
  DBMAsyncServiceImpl* service_;
  grpc::ServerCompletionQueue* queue_;
  grpc::ServerContext context_;
  grpc::ServerAsyncWriter<ReplicateResponse> stream_;
  std::atomic<ProcState> proc_state_;
  std::unique_ptr<MessageQueue::Reader> reader_;
  tkrzw::ReplicateRequest request_;
  tkrzw::ReplicateResponse response_;
  grpc::Status rpc_status_;
  std::chrono::time_point<std::chrono::system_clock> deadline_;
  grpc::Alarm alarm_;
  double wait_time_;
  std::thread bg_thread_;
  std::atomic_bool alive_;
  std::mutex mutex_;
  std::condition_variable cond_;
};

gRPCのメッセージキューによる状態変更とバックグラウンドのスレッドによる状態変更が混在する非常に難解なコードになっているが、今のところ、これ以上簡潔に書くアイデアがない。バックグラウンドスレッドのブロッキングが1秒毎であることの理由は、サーバがシャットダウンする際に1秒以内に停止したいからだ。

gRPCのアラーム機能は、JavaScriptのsetTimeoutと同じノリで、一定時間経過後にイベントを発動させる。この機能の面白いところは、キャンセルすると、キャンセルした旨を表すイベントが即座に発動することだ。状態変化の通知を即座に行いたい場合には、それを逆手に取って、待ち時間を無限もしくは長時間にしたアラームを登録しておいて、状態変化を検出した際にアラームをキャンセルすればよい。

状態変化を検知した時に新たなアラームを入れるという方法も試したのだが、それだとリソース管理が難しくなる。gRPCのメッセージキューに入れるAsyncDBMProcessorInterfaceオブジェクトの所有権はメッセージキューが持つ。よって、作ったオブジェクトはすぐにキューに入れねばならないし、コールバックが呼び出されたオブジェクトは自身をキューに入れ直すか、死ぬかを選ばねばならない。もし、状態変化を受け取った時に自分をキューに入れてしまうと、それに違反する。一方、更新ログの取得が成功すればWriteで自分をキューに入れて、タイムアウトした場合にはアラームで自分をキューに入れるという手順は問題ない。アラームが入っている場合にキャンセルしても、キャンセルイベントをきちんと拾えば、所有権の管理は混乱しない。

非同期APIモードのレプリケーション実装の仕上げとして、gRPCのメッセージキューに上述のタスクをぶっこんで駆動させる処理を書く。is_shutdownパラメータは呼び出し側に変数のポインタで、これは終了用のシグナルハンドラによってfalseに変更される。

inline void DBMAsyncServiceImpl::OperateQueue(
    grpc::ServerCompletionQueue* queue, const bool* is_shutdown) {
  // タスクを生成すると同時にキューに登録する
  new AsyncDBMProcessor<ChangeMasterRequest, ChangeMasterResponse>(
      this, queue, &DBMAsyncServiceImpl::RequestChangeMaster,
      &DBMServiceBase::ChangeMasterImpl);
  // 無限ループ
  while (true) {
    // 次のメッセージを読み込む。tagにはAsyncDBMProcessorInterfaceオブジェクトか
    // nullptrが入る
    void* tag = nullptr;
    bool ok = false;
    if (!queue->Next(&tag, &ok)) {
      break;
    }
    auto* proc = static_cast<AsyncDBMProcessorInterface*>(tag);
    if (ok) {
      // セッションがキャンセルされていないのなら、処理を進める
      if (proc != nullptr) {
        proc->Proceed();
      }
    } else {
      // セッションがキャンセルされたなら、タスクもキャンセルする
      // レプリケーションの更新待ちのタイムアウトの場合もここに来る
      if (proc != nullptr) {
        proc->Cancel(*is_shutdown);
      }
      // シャットダウンなら、抜ける
      if (*is_shutdown) {
        break;
      }
    }
  }
}

スレーブの実装

マスタの機能を司るReplicateメソッドはこうして実装できたわけだが、道はまだ終わりではない。半分来ただけだ。スレーブとしての機能も書かねばならん。スレーブはマスタにとってはクライアントなので、クライアントライブラリの中に大部分の機能を実装して、サーバ上ではそれを呼び出すだけにする。

クライアントライブラリのRemoteDBMクラスに、Replicatorという子クラスを定義して、それをMakeReplicatorメソッドで作る。ReplicatorはStartでレプリケーションを開始して、その後にReadを繰り返して呼び出して更新ログを取得する。

class RemoteDBM final {
  class Replicator {
   public:
    // レプリケーションを開始する
    // 開始タイムスタンプ、自分のサーバID、最大待ち時間を設定
    Status Start(int64_t min_timestamp, int32_t server_id = 0, double wait_time = -1);
    // 接続したサーバのIDを取得する
    int32_t GetMasterServerID();
    // 次の更新ログを読み込む
    Status Read(int64_t* timestamp, ReplicateLog* op);
  };

  // レプリケータを作る
  std::unique_ptr<Replicator> MakeReplicator();
};

ReplicatorのStartメソッドは、レプリケーションを開始する。

Status RemoteDBMReplicatorImpl::Start(
    int64_t min_timestamp, int32_t server_id, double wait_time) {
  ...
  // タイムアウトの設定
  context_.set_deadline(std::chrono::system_clock::now() + std::chrono::microseconds(
      static_cast<int64_t>(dbm_->timeout_ * 1000000)));
  // 開始タイムスタンプ、サーバID、待ち時間をリクエストに指定
  ReplicateRequest request;
  request.set_min_timestamp(min_timestamp);
  request.set_server_id(server_id);
  request.set_wait_time(wait_time);
  // リクエストを送信
  stream_ = dbm_->stub_->Replicate(&context_, request);
  // 最初のレスポンスを受信して、サーバIDを知る
  ReplicateResponse response;
  if (!stream_->Read(&response)) {
    healthy_.store(false);
    const std::string message = GRPCStatusString(stream_->Finish());
    return Status(Status::NETWORK_ERROR, StrCat("Read failed: ", message));
  }
  if (response.op_type() != ReplicateResponse::OP_NOOP) {
    return Status(Status::BROKEN_DATA_ERROR, "invalid operation type");
  }
  server_id_ = response.server_id();
  return Status(Status::SUCCESS);
}

ReplicatorのReadメソッドは、次の更新ログを受け取って、更新ログオブジェクトを設定する。

Status RemoteDBMReplicatorImpl::Read(int64_t* timestamp, RemoteDBM::ReplicateLog* op) {
  ...
  // レスポンスを受信
  ReplicateResponse response;
  if (!stream_->Read(&response)) {
    healthy_.store(false);
    const std::string message = GRPCStatusString(stream_->Finish());
    return Status(Status::NETWORK_ERROR, StrCat("Read failed: ", message));
  }
  // 結果の構築
  *timestamp = response.timestamp();
  delete[] op->buffer_;
  switch (response.op_type()) {
    case ReplicateResponse::OP_SET:
      op->op_type = DBMUpdateLoggerMQ::OP_SET;
      break;
    case ReplicateResponse::OP_REMOVE:
      op->op_type = DBMUpdateLoggerMQ::OP_REMOVE;
      break;
    case ReplicateResponse::OP_CLEAR:
      op->op_type = DBMUpdateLoggerMQ::OP_CLEAR;
      break;
    default:
      op->op_type = DBMUpdateLoggerMQ::OP_VOID;
      break;
  }
  op->server_id = response.server_id();
  op->dbm_index = response.dbm_index();
  op->buffer_ = new char[response.key().size() + response.value().size() + 1];
  char* wp = op->buffer_;
  std::memcpy(wp, response.key().data(), response.key().size());
  op->key = std::string_view(wp, response.key().size());
  wp += response.key().size();
  std::memcpy(wp, response.value().data(), response.value().size());
  op->value = std::string_view(wp, response.value().size());
  return MakeStatusFromProto(response.status());
}

サーバ内のスレーブの実装に話を戻す。レプリケーションのスレーブ機能はサーバの本分であるリクエストへの応答とは本質的に異なる機能なので、別スレッドで動作させる。サーバが作られた時にスレッドを起動し、死ぬ前にスレッドを停止する。すなわち、コンストラクタでスレッドを立てて、デストラクタで停止させる。

class DBMServiceBase {
 public:
  DBMServiceBase(...) {
    StartManager();
  }
  ~DBMServiceBase() {
    StopManager();
  }
  void StartManager() {
   // レプリケーションスレッドを立てる
    thread_repl_manager_ = std::thread([&]{ ManageReplication(); });
  }
  void StopManager() {
    // このフラグが立っていたら、レプリケーションスレッドは自殺する
    alive_.store(false);
    // レプリケーションスレッドを回収する
    thread_repl_manager_.join();
  }
  ...
}

レプリケーションスレッドの実装はこうなる。レプリケーションのマスタは運用中に動的に変更されるので、それを検出してレプリケーションの停止と再開ができるようにしてある。

class DBMServiceBase {
 public:
  void ManageReplication() {
    // マスタから貰った最大のタイムスタンプ。次回起動時はここから再開する
    int64_t max_timestamp = 0;
    // レプリケーションのパラメータ
    ReplicationParameters params;
    // 停止指示があるまでループ
    while (alive_.load()) {
      // ビジーループにならないように休む
      SleepThread(1.0);
      {
        // アトミックに現在の設定を調べるためにロックをかける
        std::lock_guard<SpinMutex> lock(mutex_);
        // マスタが設定されていなければ、レプリケーションはしない
        if (repl_params_.master.empty()) {
          continue;
        }
        // 再設定されたかもしれないレプリケーションパラメータをスタックオブジェクトに複製
        params = repl_params_;
        params.min_timestamp = std::max(max_timestamp, params.min_timestamp);
        // パラメータが再設定された場合、それを反映する
        if (refresh_repl_manager_) {
          refresh_repl_manager_.store(false);
          params.min_timestamp = std::max<int64_t>(0, params.min_timestamp + repl_ts_skew_);
          repl_ts_skew_ = 0;
          logger_->LogCat(Logger::LEVEL_INFO, "Replicating ", params.master,
                          " with the min_timestamp ", params.min_timestamp);
          success = true;
        }
      }
      // 実際のセッションはDoReplicationSessionに委譲する
      success = DoReplicationSession(&params, success);
      // 最大のタイムスタンプを保存して、次回のセッションでも使う
      max_timestamp = std::max(max_timestamp, params.min_timestamp);
    }
    logger_->Log(Logger::LEVEL_DEBUG, "The replicatin manager finished");
  }
  ...
};

レプリケーションの個々のセッションは、以下の実装になる。

class DBMServiceBase {
 public:
  bool DoReplicationSession(ReplicationParameters* params, bool success) {
    // マスタに接続する
    RemoteDBM master;
    Status status = master.Connect(params->master);
    if (status != Status::SUCCESS) {
      // 連続して失敗してもログは1回しか出さない
      if (success) {
        logger_->Log(Logger::LEVEL_WARN, "unable to reach the master");
      }
      return false;
    }
    if (!success) {
      logger_->LogCat(Logger::LEVEL_INFO, "Reconnected to ", params->master,
                      " with the min_timestamp ", params->min_timestamp);
    }
    // レプリケーションクライアントを作り、タイムスタンプとサーバIDを指定して開始
    auto repl = master.MakeReplicator();
    status = repl->Start(params->min_timestamp, server_id_, params->wait_time);
    if (status != Status::SUCCESS) {
      logger_->LogCat(Logger::LEVEL_WARN, "replication error: ", status);
      return false;
    }
    // マスタのサーバIDを得ておく
    const int32_t master_id = repl->GetMasterServerID();
    RemoteDBM::ReplicateLog op;
    int64_t count = 0;
    // サーバが死ぬか、パラメータが書き換えられるまで、ループする
    while (alive_.load() && !refresh_repl_manager_.load()) {
      // 次の更新ログを読み込む
      int64_t timestamp = 0;
      status = repl->Read(&timestamp, &op);
      if (status == Status::SUCCESS) {
        if (count == 0) {
          logger_->LogCat(Logger::LEVEL_INFO, "replication start: master_id=", master_id,
                          ", timestamp=", timestamp);
        }
        // データベースインデックスの変域を確認
        if (op.dbm_index < 0 || op.dbm_index >= static_cast<int32_t>(dbms_.size())) {
          logger_->LogCat(Logger::LEVEL_ERROR, "out-of-range DBM index");
          return true;
        }
        // サーバIDが重複しないことを確認
        if (op.server_id == server_id_) {
          logger_->LogCat(Logger::LEVEL_ERROR, "duplicated server ID");
          return true;
        }
        // インデックスに対応するデータベースを取得
        DBM* dbm = dbms_[op.dbm_index].get();
        // タイムスタンプを更新
        params->min_timestamp = std::max(timestamp, params->min_timestamp);
        // 更新ログの内容に応じてデータベースを更新
        switch (op.op_type) {
          case DBMUpdateLoggerMQ::OP_SET: {
            logger_->LogCat(Logger::LEVEL_DEBUG, "replication: ts=", timestamp,
                            ", server_id=", op.server_id, ", dbm_index=", op.dbm_index,
                            ", op=SET");
            // サーバIDをマスタのものに一時的に書き換えてから更新を行う
            DBMUpdateLoggerMQ::OverwriteThreadServerID(master_id);
            status = dbm->Set(op.key, op.value);
            DBMUpdateLoggerMQ::OverwriteThreadServerID(-1);
            if (status != Status::SUCCESS) {
              logger_->LogCat(Logger::LEVEL_ERROR, "Set failed: ", status);
              return true;
            }
            break;
          }
          case DBMUpdateLoggerMQ::OP_REMOVE: {
            logger_->LogCat(Logger::LEVEL_DEBUG, "replication: ts=", timestamp,
                            ", server_id=", op.server_id, ", dbm_index=", op.dbm_index,
                            ", op=REMOVE");
            // サーバIDをマスタのものに一時的に書き換えてから更新を行う
            DBMUpdateLoggerMQ::OverwriteThreadServerID(master_id);
            status = dbm->Remove(op.key);
            DBMUpdateLoggerMQ::OverwriteThreadServerID(-1);
            if (status != Status::SUCCESS && status != Status::NOT_FOUND_ERROR) {
              logger_->LogCat(Logger::LEVEL_ERROR, "Remove failed: ", status);
              return true;
            }
            break;
          }
          case DBMUpdateLoggerMQ::OP_CLEAR: {
            logger_->LogCat(Logger::LEVEL_DEBUG, "replication: ts=", timestamp,
                            ", server_id=", op.server_id, ", dbm_index=", op.dbm_index,
                            ", op=CLEAR");
            // サーバIDをマスタのものに一時的に書き換えてから更新を行う
            DBMUpdateLoggerMQ::OverwriteThreadServerID(master_id);
            status = dbm->Clear();
            DBMUpdateLoggerMQ::OverwriteThreadServerID(-1);
            if (status != Status::SUCCESS) {
              logger_->LogCat(Logger::LEVEL_ERROR, "Clear failed: ", status);
              return true;
            }
            break;
          }
          default:
            break;
        }
        // 定期的にタイムスタンプを書き出す
        if (count % TIMESTAMP_FILE_SYNC_FREQ == 0) {
          SaveTimestamp(*params);
        }
        count++;
      } else if (status == Status::INFEASIBLE_ERROR) {
        // タイムアウトした場合には、タイムスタンプだけを更新する
        params->min_timestamp = std::max(timestamp, params->min_timestamp);
      } else {
        logger_->LogCat(Logger::LEVEL_WARN, "replication error: ", status);
        break;
      }
    }
    // 最後は必ずタイムスタンプを書き出す
    SaveTimestamp(*params);
    return true;
  }
  ...
};

スレーブ側の実装のポイントは、しつこいようだが、タイムスタンプの管理にある。サーバ側から受け取った値のみを使って、自分の時計は参照しないのが重要だ。また、留意すべきは、自分のローカルに保存する更新ログには自分の時計で生成したタイムスタンプが使われるということだ。自分の更新ログの管理はDBMの内部に仕掛けられたメッセージキューの責任なので、ここでは気にしなくて良い。

また、更新ログのサーバIDを書き換えていることにも注意だ。レプリケーションを経由しない通常の更新では、更新ログには自分のサーバIDが書かれる。レプリケーションの場合にはそれだと困るので、OverwriteThreadServerIDというダサい名前のメソッドでスレッドローカル変数に上書き処理を指示することで、サーバIDを変更している。これはサーバIDを指定する機能がDBMに無いことに対するワークアラウンドなのだが、ダサいのは否めない。ともかく、これをしないとデュアルマスタ構成が組めないのだ。

非常に長くなったが、こんな感じで、gRPCベースでデータベースの非同期レプリケーション処理が実装できる。実際のコードはtkrzw_server_tmpl.htkrzw_dbm_remote.ccをご覧いただきたい。改めて思うのは、gRPCのアプリケーションって、やり方を知ってれば簡単に書けるのだけれど、知らないとマジで嵌まるということだ。自分が次に嵌まらないようにここにメモを記したのだ。

今後の展望

最終回なので、今後の展望について述べておこう。まずは、各言語用のクライアントライブラリを書かねばならない。C++のライブラリをラップするか、各言語のgRPCライブラリを使って書くかは状況に応じて考える。自分の趣味というか修行という観点では、各言語のgRPCライブラリを学んだ方が良さそうだが、性能の観点でどちらが良いかは、その言語とライブラリにもよるので悩ましいところだ。gRPCの認証機構もまだ実装していないので、そのあたりを作り込む余地がある。

サーバ自身の機能に関しては、同期レプリケーションのサポートというのが考えられる。マスタからスレーブに更新をプッシュする方式であれば、2相コミットなどの複雑なプロトコルも実装でき、信頼性とレイテンシのバランスを任意に調整できる。あるいは、より簡単かつ高効率な、「準同期」的な方法もある。更新ログが転送された時点でレプリケーション成功とみなすのだ。スレーブ毎の転送済みタイムスタンプをサーバ側で管理しておいて、N個以上のスレーブのタイムスタンプが今書き込んだ更新ログを上回るまで待てばよい。転送されたとしても到達したとは限らないので絶対の信頼性とは言えないが、マスタ自身の障害とネットワーク障害が同時に起こる確率は低いので、実際の信頼性は高い。中途半端な感じは否めないが、非同期レプリケーションの高信頼化オプション的な位置づけならばありかな。

あとは、死活監視やバックアップやデプロイやサーバの起動を包括的に管理するシステムが欲しくはなる。しかし、そのあたりの最適解はサービス設計に依存するので、まずは自分で何らかのサービスを運用してそれに適用しながら考えるべきだろう。

おまけ

日本語のスライドを作ってみた。このTkrzw-RPCの概要がまとめてあるので、ご覧頂きたい。

docs.google.com