豪鬼メモ

一瞬千撃

DBサービスを作ろう その3 ストリームAPIでイテレータを実装

前回の記事で、GetやSetなどの、通常のデータベースの個々のレコードを操作するAPIを実装した。CountやRebuildなどの、データベース全体に対する操作も同様に実装できる。しかし、データベース内を横断して個々のレコードを取り出すイテレータは、進捗状態を持ち回さねばならないので、同じ方法では実装しづらい。そこで、gRPCのストリームAPIを利用することにした。その設計と実装とテスト方法を紹介する。


DBMのイテレータは、データベース内の個々のレコードを取り出す機能だが、普通のAPI(unary API)で実装するのは難しい。1回のリクエストに1回のレスポンスという構造でやろうとすると、全てのレコードのデータをレスポンスに入れることになるが、それは現実的ではない。イテレータの定石通り、「次のレコードを取り出す」という操作を何度も実行する構造にしたいが、その場合は、「どこまで進んだか」という情報をリクエスト間で使い回さねばならない。となると、セッション的な概念が生じて、そのセッション情報を誰の責任で管理するか考えなければならない。

gRPCのストリームAPIを使うと、この問題をうまく解決できる。ストリームは双方向にでき、クライアントからサーバに複数回に分けてデータを送れるし、同時にサーバからクライアントに複数回に分けてデータを送れる。ピンポン的に同期したやりとりもできる。サーバがクライアントからリクエストを読み込み、それに対応したレスポンスを書き込むという手順だけ決めておくと、任意のセッション操作が実現できる。FTPみたいな感じと言えば分かりやすいかもしれない。ともかく、gRPCでは単一の接続を維持しつつセッション的なやり取りができるということを念頭において、設計を進めよう。

イテレータAPIは以下のようになる。完全なAPIについてはtkrzw_dbm_remote.hをご覧頂きたいが、イテレータAPIもその他のAPIも、ローカルのAPIとほとんど同じになるようにしている。

class RemoteDBM {
 public:
  // そのデータベース内のレコードを巡回するイテレータを作る
  Iterator MakeIterator();
};

class Iterator {
 public:
   // イテレータをデータベースの先頭に飛ばす。
   Status First();

   // イテレータを次のレコードに進める。
   Status Next();

   // 現在のレコードのキーと値を取り出す。
   Status Get(std::string* key, std::string* value);
};

上記の操作をRPCのストリームAPIでやろうとすると、そのサービス定義の概要は以下のようになる。tkrzw_rpc.protoに完全な定義がある。

// イテレータセッション内の個々のリクエスト
message IterateRequest {
  // 操作の種類を示す列挙型
  enum OpType {
    // イテレータを最初の位置に飛ばす
    OP_FIRST = 1;
    // イテレータを次の位置に進める
    OP_NEXT = 6;
    // 現在のレコードのデータを取り出す
    OP_GET = 8;
  }
}

// イテレータセッション内の個々のレスポンス
message IterateResponse {
  // ステータスコードとメッセージの構造体
  StatusProto status = 1;
  // 取得したキー
  bytes key = 2;
  // 取得した値
  bytes value = 3;
}

service DBMService {
  イテレータのメソッド。入力も出力もstreamとして扱う
  rpc Iterate(stream IterateRequest) returns (stream IterateResponse);
}

上記のprotoファイルをビルドすると、Iterateというメソッドが定義される。まずはサーバ側を実装しよう。クライアントからサーバに送られるリクエストとサーバからクライアントに送り返すレスポンスの双方を扱うためのgrpc::ServerReaderWriterというクラスのオブジェクトが引数として渡される。完全な実装はtkrzw_server_impl.hにあるが、概要は以下のようになる。

class DBMServiceImpl : public DBMService::Service {
 public:
  grpc::Status Iterate(
      grpc::ServerContext* context,
      grpc::ServerReaderWriter<tkrzw::IterateResponse, tkrzw::IterateRequest>* stream) {
    // ローカルデータベースのイテレータのプレースホルダ
    std::unique_ptr<DBM::Iterator> iter;

    // セッションが終わるまでループ
    while (true) {
      // キャンセルされたら抜ける
      if (context->IsCancelled()) {
        return grpc::Status(grpc::StatusCode::CANCELLED, "cancelled");
      }
      // クライアントからレクエストを受け取る。なければ抜ける
      tkrzw::IterateRequest request;
      if (!stream->Read(&request)) {
        break;
      }
      LogRequest(context, "Iterate", &request);

      // セッション内の最初のリクエストであれば、ローカルのイテレータを生成する
      if (iter == nullptr) {
        if (request.dbm_index() < 0 ||
            request.dbm_index() >= static_cast<int32_t>(dbms_.size())) {
          return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "dbm_index is out of range");
        }
        auto& dbm = *dbms_[request.dbm_index()];
        iter = dbm.MakeIterator();
      }

      // リクエストの中のoperationに応じた処理を行い、レスポンスを設定する。
      tkrzw::IterateResponse response;
      switch (request.operation()) {
        // レコードを最初の位置に飛ばす
        case IterateRequest::OP_FIRST: {
          const Status status = iter->First();
          response.mutable_status()->set_code(status.GetCode());
          response.mutable_status()->set_message(status.GetMessage());
          break;
        }
        // レコードを次の位置に進める
        case IterateRequest::OP_NEXT: {
          const Status status = iter->Next();
          response.mutable_status()->set_code(status.GetCode());
          response.mutable_status()->set_message(status.GetMessage());
          break;
        }
        // 現在のレコードのキーと値を取り出す
        case IterateRequest::OP_GET: {
          std::string key, value;
          const Status status = iter->Get(
              request.omit_key() ? nullptr : &key, request.omit_value() ? nullptr : &value);
          response.mutable_status()->set_code(status.GetCode());
          response.mutable_status()->set_message(status.GetMessage());
          if (status == Status::SUCCESS) {
            response.set_key(key);
            response.set_value(value);
          }
          break;
        }
      }

      // レスポンスを送信する
      if (!stream->Write(response)) {
        break;
      }
    }
    return grpc::Status::OK;
  }
};

クライアント側の実装は以下のようになる。サーバ側とは逆で、クライアント側からサーバ側にリクエストを書き込み、サーバ側から送られたレスポンスを読み出すための、grpc::ClientReaderWriter型のオブジェクトを使う。完全なコードはtkrzw_dbm_remote.ccにある。メソッドの種類が多いので大変そうに見えるが、個々のメソッドは単純かつ美しく書ける。

// イテレータのコンストラクタ。スタブから返されたストリームを格納
RemoteDBMIteratorImpl::RemoteDBMIteratorImpl(RemoteDBMImpl* dbm)
    : dbm_(dbm), context_(), stream_(nullptr) {
  stream_ = dbm_->stub_->Iterate(&context_);
}

// イテレータのデストラクタ。接続を切断
RemoteDBMIteratorImpl::~RemoteDBMIteratorImpl() {
  stream_->WritesDone();
  stream_->Finish();
}

// Firstメソッドの実装。Nextもほぼ同じ
Status RemoteDBMIteratorImpl::First() {
  // リクエストデータを作る
  IterateRequest request;
  request.set_operation(IterateRequest::OP_FIRST);
  // リクエストデータを送信する
  if (!stream_->Write(request)) {
    return Status(Status::NETWORK_ERROR, "Write failed");
  }
  // レスポンスデータを受信する
  IterateResponse response;
  if (!stream_->Read(&response)) {
    return Status(Status::NETWORK_ERROR, "Read failed");
  }
  // 結果のステータスを返す
  return MakeStatusFromProto(response.status());
}

// Getメソッドの実装
Status RemoteDBMIteratorImpl::Get(std::string* key, std::string* value) {
  // リクエストデータを作る
  IterateRequest request;
  request.set_operation(IterateRequest::OP_GET);
  request.set_omit_key(true);
  request.set_omit_value(true);
  // リクエストデータを送信する
  if (!stream_->Write(request)) {
    return Status(Status::NETWORK_ERROR, "Write failed");
  }
  // レスポンスデータを受信する
  IterateResponse response;
  if (!stream_->Read(&response)) {
    return Status(Status::NETWORK_ERROR, "Read failed");
  }
  // 結果のデータとステータスを返す
  if (response.status().code() == 0) {
    *key = response.key();
    *value = response.value();
  }
  return MakeStatusFromProto(response.status());
}

クライアントのユニットテストは以下のように書く。実際のコードはtkrzw_dbm_remote_test.ccになる。要点は、"grpcpp/test/mock_stream.h" をincludeすると使えるようになるgrpc::testing::MockClientReaderWriterでストリームのモックを作ることだ。

TEST_F(RemoteDBMTest, IterateMove) {
  // ストリームのモックを作る
  auto stream = std::make_unique<grpc::testing::MockClientReaderWriter<
    tkrzw::IterateRequest, tkrzw::IterateResponse>>();

  // 期待されるリクエストのデータを準備する
  tkrzw::IterateRequest request_first;
  request_first.set_operation(tkrzw::IterateRequest::OP_FIRST);
  tkrzw::IterateRequest request_next;
  request_next.set_operation(tkrzw::IterateRequest::OP_NEXT);
  tkrzw::IterateRequest request_get;
  request_get.set_operation(tkrzw::IterateRequest::OP_GET);
  request_get.set_key("key");
  request_get.set_value("value");

  // 期待されるレスポンスのデータを準備する。デフォルト状態で成功を意味する
  tkrzw::IterateResponse response;

  // リクエストは3回に分けて呼ばれることを期待する
  EXPECT_CALL(*stream, Write(EqualsProto(request_get_both), _)).WillOnce(Return(true));
  EXPECT_CALL(*stream, Write(EqualsProto(request_get_none), _)).WillOnce(Return(true));
  EXPECT_CALL(*stream, Write(EqualsProto(request_get_key), _)).WillOnce(Return(true));

  // レスポンスは常に成功を返すことを期待する
  EXPECT_CALL(*stream, Read(_))
    .WillRepeatedly(DoAll(SetArgPointee<0>(response), Return(true)));

  // セッション終了時にはWritesDoneとFinishが呼ばれることを期待する
  EXPECT_CALL(*stream, WritesDone()).WillOnce(Return(true));
  EXPECT_CALL(*stream, Finish()).WillOnce(Return(grpc::Status::OK));

  // Iterateメソッドが呼ばれることを期待しつつ、モックのストリームを仕込む
  auto stub = std::make_unique<tkrzw::MockDBMServiceStub>();
  EXPECT_CALL(*stub, IterateRaw(_)).WillRepeatedly(Return(stream.release()));

  // 各APIを呼びだし、結果を確認する
  tkrzw::RemoteDBM dbm;
  dbm.InjectStub(stub.release());
  auto iter = dbm.MakeIterator();
  std::string key, value;
  EXPECT_EQ(tkrzw::Status::SUCCESS, iter->First());
  EXPECT_EQ(tkrzw::Status::SUCCESS, iter->Next());
  EXPECT_EQ(tkrzw::Status::SUCCESS, iter->Get(&key, &value));
  EXPECT_EQ("key", key);
  EXPECT_EQ("value", value);
}

サーバ側のユニットテストは、直接は書けない。protocが直接定義するIterateメソッドはgrpc::ServerReaderWriterを受け取るシグネチャになっているのだが、これはfinalだしコンストラクタもprivateなので、モック化できない。よって、grpc::ServerReaderWriterInterfaceを受け取るIterateImplというメソッドに処理を移譲するようにして、そのIterateImplをテストするように改変する。

class DBMServiceImpl : public DBMService::Service {
 public:
  grpc::Status Iterate(
      grpc::ServerContext* context,
      grpc::ServerReaderWriter<tkrzw::IterateResponse, tkrzw::IterateRequest>* stream) override {
    return IterateImpl(context, stream);
  }

  grpc::Status IterateImpl(grpc::ServerContext* context,
      grpc::ServerReaderWriterInterface<
                           tkrzw::IterateResponse, tkrzw::IterateRequest>* stream) {
    ...
  }
};

ストリームのモックを作る。GitHubにあるgRPCの最新版のmock_stream.hにはMockServerReaderWriterというクラスがあるのだが、Ubuntuのパッケージのバージョンだとまだない。よって、自分で書く。

template <class W, class R>
class MockServerReaderWriter : public grpc::ServerReaderWriterInterface<W, R> {
 public:
  MockServerReaderWriter() = default;
  MOCK_METHOD0_T(SendInitialMetadata, void());
  MOCK_METHOD1_T(NextMessageSize, bool(uint32_t*));
  MOCK_METHOD1_T(Read, bool(R*));
  MOCK_METHOD2_T(Write, bool(const W&, const grpc::WriteOptions));
};

テストケースを書く。実際のコードはtkrzw_server_test.ccを参照のこと。

TEST_F(ServerTest, IteratorSimple) {
  // データベースを準備する
  tkrzw::TemporaryDirectory tmp_dir(true, "tkrzw-");
  const std::string file_path = tmp_dir.MakeUniquePath();
  std::vector<std::unique_ptr<tkrzw::ParamDBM>> dbms(1);
  dbms[0] = std::make_unique<tkrzw::PolyDBM>();
  const std::map<std::string, std::string> params =
      {{"dbm", "TreeDBM"}, {"num_buckets", "10"}};
  EXPECT_EQ(tkrzw::Status::SUCCESS,
            dbms[0]->OpenAdvanced(file_path, true, tkrzw::File::OPEN_DEFAULT, params));
  // 00000001から00000010までのレコードを格納する
  for (int32_t i = 1; i <= 10; i++) {
    const std::string key = tkrzw::SPrintF("%08d", i);
    const std::string value = tkrzw::ToString(i * i);
    EXPECT_EQ(tkrzw::Status::SUCCESS, dbms[0]->Set(key, value));
  }

  // 実行環境の準備
  tkrzw::StreamLogger logger;
  tkrzw::DBMServiceImpl server(dbms, &logger);
  grpc::ServerContext context;
  MockServerReaderWriter<tkrzw::IterateResponse, tkrzw::IterateRequest> stream;

  // リクエストデータ
  tkrzw::IterateRequest request_get;
  request_get.set_operation(tkrzw::IterateRequest::OP_GET);
  tkrzw::IterateRequest request_first;
  request_first.set_operation(tkrzw::IterateRequest::OP_FIRST);
  tkrzw::IterateRequest request_last;
  request_last.set_operation(tkrzw::IterateRequest::OP_LAST);
  tkrzw::IterateRequest request_next;
  request_next.set_operation(tkrzw::IterateRequest::OP_NEXT);

  // 移動系操作のレスポンスは常に成功を期待
  tkrzw::IterateResponse response_move;
  // Firstの直後のレコード取得
  tkrzw::IterateResponse response_get_first;
  response_get_first.set_key("00000001");
  response_get_first.set_value("1");
  // Lastの直後のレコード取得
  tkrzw::IterateResponse response_get_last;
  response_get_last.set_key("00000010");
  response_get_last.set_value("100");
  // Nextの直後のレコード取得は失敗
  tkrzw::IterateResponse response_get_next;
  response_get_next.mutable_status()->set_code(tkrzw::Status::NOT_FOUND_ERROR);

  // First, Get, Last, Get, Next, Getを呼んで、終了
  EXPECT_CALL(stream, Read(_))
      .WillOnce(DoAll(SetArgPointee<0>(request_first), Return(true)))
      .WillOnce(DoAll(SetArgPointee<0>(request_get), Return(true)))
      .WillOnce(DoAll(SetArgPointee<0>(request_last), Return(true)))
      .WillOnce(DoAll(SetArgPointee<0>(request_get), Return(true)))
      .WillOnce(DoAll(SetArgPointee<0>(request_next), Return(true)))
      .WillOnce(DoAll(SetArgPointee<0>(request_get), Return(true)))
      .WillOnce(Return(false));

  // それぞれのレスポンスを登録
  EXPECT_CALL(stream, Write(EqualsProto(response_move), _)).WillRepeatedly(Return(true));
  EXPECT_CALL(stream, Write(EqualsProto(response_get_first), _)).WillOnce(Return(true));
  EXPECT_CALL(stream, Write(EqualsProto(response_get_last), _)).WillOnce(Return(true));
  EXPECT_CALL(stream, Write(EqualsProto(response_get_next), _)).WillOnce(Return(true));

  // Iterateの代わりにIterateImplを呼び出す
  grpc::Status status = server.IterateImpl(&context, &stream);
  EXPECT_TRUE(status.ok());
  EXPECT_EQ(tkrzw::Status::SUCCESS, dbms[0]->Close());
}

結合テストフレームワークはまだ整備していないので、とりあえずはコマンドラインユーティリティにイテレータ機能を組み込んで、end-to-endで確認できるようにしよう。tkrzw_dbm_remote_util.ccを書き換えて、listというサブコマンドを実装する。

static int32_t ProcessList(int32_t argc, const char** args) {
  ...
  RemoteDBM dbm;
  Status status = dbm.Connect(host, port);
  ...
  auto iter = dbm.MakeIterator();
  ...
  const Status status = iter->First();
  if (status != Status::SUCCESS) {
    EPrintL("First failed: ", status);
  }
  for (int64_t count = 0; ok && count < num_items; count++) {
    std::string key, value;
    Status status = iter->Get(&key, &value);
    if (status != Status::SUCCESS) {
      if (status != Status::NOT_FOUND_ERROR) {
        EPrintL("Get failed: ", status);
        ok = false;
      }
      break;
    }
    PrintL(key, "\t", value);
    status = iter->Next();
    if (status != Status::SUCCESS) {
      EPrintL("Next failed: ", status);
      ok = false;
      break;
    }
  }
  dbm.Disconnect();
  return ok ? 0 : 1;
}

さて、動作確認しよう。例によって、サーバとクライアントを別々の端末で操作する。

$ ./tkrzw_server --log_level debug "#dbm=baby"
2021/08/29 20:14:06 [INFO] ==== Starting the process as a command ====
2021/08/29 20:14:06 [INFO] Opening a database: #dbm=baby
2021/08/29 20:14:06 [INFO] address=0.0.0.0:1978, pid=6253
2021/08/29 20:14:49 [DEBUG] ipv4:127.0.0.1:57770 [Set] key: "00001" value: "apple" overwrite: true
2021/08/29 20:14:53 [DEBUG] ipv4:127.0.0.1:57772 [Set] key: "00002" value: "lemon" overwrite: true
2021/08/29 20:14:59 [DEBUG] ipv4:127.0.0.1:57776 [Set] key: "00003" value: "peach" overwrite: true
2021/08/29 20:15:19 [DEBUG] ipv4:127.0.0.1:57778 [Set] key: "00004" value: "grape" overwrite: true
2021/08/29 20:15:29 [DEBUG] ipv4:127.0.0.1:57780 [Set] key: "00005" value: "melon" overwrite: true
2021/08/29 20:15:37 [DEBUG] ipv4:127.0.0.1:57782 [Iterate] operation: OP_FIRST
2021/08/29 20:15:37 [DEBUG] ipv4:127.0.0.1:57782 [Iterate] operation: OP_GET
2021/08/29 20:15:37 [DEBUG] ipv4:127.0.0.1:57782 [Iterate] operation: OP_NEXT
2021/08/29 20:15:37 [DEBUG] ipv4:127.0.0.1:57782 [Iterate] operation: OP_GET
2021/08/29 20:15:37 [DEBUG] ipv4:127.0.0.1:57782 [Iterate] operation: OP_NEXT
2021/08/29 20:15:37 [DEBUG] ipv4:127.0.0.1:57782 [Iterate] operation: OP_GET
2021/08/29 20:15:37 [DEBUG] ipv4:127.0.0.1:57782 [Iterate] operation: OP_NEXT
2021/08/29 20:15:37 [DEBUG] ipv4:127.0.0.1:57782 [Iterate] operation: OP_GET
2021/08/29 20:15:37 [DEBUG] ipv4:127.0.0.1:57782 [Iterate] operation: OP_NEXT
2021/08/29 20:15:37 [DEBUG] ipv4:127.0.0.1:57782 [Iterate] operation: OP_GET
2021/08/29 20:15:37 [DEBUG] ipv4:127.0.0.1:57782 [Iterate] operation: OP_NEXT
2021/08/29 20:15:37 [DEBUG] ipv4:127.0.0.1:57782 [Iterate] operation: OP_GET
2021/08/29 20:17:20 [DEBUG] ipv4:127.0.0.1:57788 [Iterate] operation: OP_JUMP key: "00003"
2021/08/29 20:17:20 [DEBUG] ipv4:127.0.0.1:57788 [Iterate] operation: OP_GET
2021/08/29 20:17:20 [DEBUG] ipv4:127.0.0.1:57788 [Iterate] operation: OP_NEXT
2021/08/29 20:17:20 [DEBUG] ipv4:127.0.0.1:57788 [Iterate] operation: OP_GET
2021/08/29 20:17:20 [DEBUG] ipv4:127.0.0.1:57788 [Iterate] operation: OP_NEXT
2021/08/29 20:17:20 [DEBUG] ipv4:127.0.0.1:57788 [Iterate] operation: OP_GET
2021/08/29 20:17:20 [DEBUG] ipv4:127.0.0.1:57788 [Iterate] operation: OP_NEXT
2021/08/29 20:17:20 [DEBUG] ipv4:127.0.0.1:57788 [Iterate] operation: OP_GET
2021/08/29 20:17:44 [INFO] Shutting down by signal: 2
2021/08/29 20:17:44 [INFO] The server finished
2021/08/29 20:17:44 [INFO] Closing a database
2021/08/29 20:17:44 [INFO] ==== Ending the process in success ====
$ ./tkrzw_dbm_remote_util set 00001 apple
$ ./tkrzw_dbm_remote_util set 00002 lemon
$ ./tkrzw_dbm_remote_util set 00003 peach
$ ./tkrzw_dbm_remote_util set 00004 grape
$ ./tkrzw_dbm_remote_util set 00005 melon
$ ./tkrzw_dbm_remote_util list
00001	apple
00002	lemon
00003	peach
00004	grape
00005	melon
$ ./tkrzw_dbm_remote_util list --move jump --jump_key 00003
00003	peach
00004	grape
00005	melon

ということで、ちゃんとストリームAPIが呼ばれて、個々のリクエストが処理されて、期待通りの結果が帰ってきていることが確認できた。

ログを見ると気付くと思うが、この双方向ストリームの方法だと、サーバとクライアントの間で「ピンポン」応答が繰り返されるので、スループットの点では最善ではない。データのエクスポート(ダウンロード)に特化するなら、サーバからクライアントへの片方向ストリームであるサーバストリームを使うべきだし、データのインポート(アップロード)に特化するならクライアントからサーバへの片方向ストリームであるクライアントストリームを使うべきだ。それらについては追って検討することになるだろう。レプリケーション機能はサーバストリームとして実装できるはずだ。


まとめ。gRPCのストリームAPIを使ってデータベースサービスのイテレータを作った。ストリームAPIは大量のデータを処理するのに便利だが、対話的に処理を進めるのにも使える。ストリーム内の一連の操作はサーバ側でもクライアント側でも単一のコンテキストで実行されるので、セッション的な処理を実装するのが簡単なのだ。ユニットテストを書くのがだいぶ面倒くさいというか、gRPCのテスト方法に関するまとまった情報がないのが玉に瑕ではある。

イテレータの実装が完了したので、DBMサービスとしての基本機能は全て揃ったことになる。あとは性能テストと機能テストをもうちょいやって、ドキュメントを書き書きすれば、ひとまずの完成ということになるだろう。名前はTkrzw-RPCのままで良いかな。Hgsmrymって案もあるのだが。