豪鬼メモ

一瞬千撃

Tkrzw-RPCのRubyクライアントライブラリ

前回の記事Pythonのクライアントライブラリを紹介したが、Rubyのクライアントライブラリも実装できたので、今回はそれを紹介する。詳しいことはAPI文書をご覧いただきたい。


使用人口が多い言語から対応するようなことを言っていたので、それなら次はJavaに対応するのが妥当だが、先にRubyをやることにした。gRPCのチュートリアルを読んで、書きやすそうだったからだ。gRPCのAPIが分かりやすいというのも重要だが、Makefileだけでビルドとインストールの設定ができるというのも私にとっては好ましい点だ。

Pythonの時と同じように、TkrzwのC++のコアライブラリには依存せずに、gRPCだけに依存するようにクライアントライブラリを書いた。よって、クライアント側ではTkrzwのコアライブラリをインストールする必要はない。とはいえ、動作確認するにはTkrzw-RPCのサーバが必要だ。なので、TkrzwとTkrzw-RPCは普通にmake, make installでインストールしておいていただく。gRPCのPythonライブラリは本家のページに書いてあるようにgemでインストールするのが楽だ。

$ sudo gem install grpc

Tkrzw-RPC-Rubyのインストールも普通にmake, make installでやればOK。configureは不要。

$ tar zxvf tkrzw-rpc-ruby-0.1.2.tar.gz
$ cd tkrzw-rpc-ruby-0.1.2
$ make
$ sudo make install

サンプルプログラムを実行する前に、Tkrzw-RPCのサーバを立てる。以下のコマンドを実行すればよい。デフォルトでは、ローカルホストの1978番ポートで、オンメモリデータベースをサーブする。デバッグログを出力して、どんな処理が行われているかをわかりやすくしておこう。

$ tkrzw_server --log_level debug

最も簡単なサンプルプログラムを示そう。RubyのHashとほぼ同じように使えるのがセールスポイントだ。

# Tkrzw-RPCのパッケージをインポートする
require 'tkrzw_rpc'

# データベースを準備する
dbm = TkrzwRPC::RemoteDBM.new
dbm.connect("localhost:1978")
dbm.clear
 
# レコードを追加する
# エラーがあると代入式全体がnilを返す
dbm["first"] = "hop"
dbm["second"] = "step"
dbm["third"] = "jump"
 
# レコードを検索する
# レコードがなければnilを返す
p dbm["first"]
p dbm["second"]
p dbm["third"]
p dbm["fourth"]
 
# eachとイテレータで横断アクセスが可能
dbm.each do |key, value|
  p key + ": " + value
end
 
# 接続を切って、リソースを解放する
dbm.disconnect
dbm.destruct

もうちょい複雑なサンプルも示そう。いちいちエラーチェックもやっている。

require 'tkrzw_rpc'

dbm = TkrzwRPC::RemoteDBM.new
begin
  # データベースを準備する
  # タイムアウトは秒単位で設定する
  status = dbm.connect("localhost:1978", 10)
  if not status.ok?
    raise TkrzwRPC::StatusException.new(status)
  end

  # 単一サーバで複数データベースを運用している場合、その操作対象を指定できる
  # デフォルトは0で、最初のデータベースが対象となっている
  dbm.set_dbm_index(0).or_die

  # レコードを追加する
  # Set等の多くのメソッドはStatusオブジェクトを返すが、そのor_dieメソッドは、
  # 成功状態ではない場合に自身を例外にして投げる
  dbm.set(1, "hop").or_die
  dbm.set(2, "step").or_die
  dbm.set(3, "jump").or_die
 
  # エラーチェックなしに検索を行う
  # 検索が失敗した場合にはNoneが返される
  p dbm.get(1)
  p dbm.get(2)
  p dbm.get(3)
  p dbm.get(4)
 
  # エラーチェックをしたい場合、statusオブジェクトを作って渡す
  # statusオブジェクトとステータスコードは直接比較できる
  status = TkrzwRPC::Status.new
  value = dbm.get(1, status)
  printf("status: %s\n", status)
  if status == TkrzwRPC::Status::SUCCESS
    printf("value: %s\n", value)
  end
 
  # データベースを再構築する
  dbm.rebuild
 
  # 横断的に全レコードにアクセスする
  begin
    iter = dbm.make_iterator
    iter.first
    while true do
      status = TkrzwRPC::Status.new
      record = iter.get(status)
      break if not status.ok?
      printf("%s: %s\n", record[0], record[1])
      iter.next
    end
  ensure
    # イテレータのリソースを解放する
    iter.destruct
  end

  # 接続を閉じる
  dbm.disconnect.or_die
ensure
  # データベースのリソースを解放する
  # 既に接続しているなら閉じる
  dbm.destruct
end

そんなわけで、TkrzwのローカルのAPIとほぼ同じ使用感になっているし、実際、Open/Closeの代わりにConnect/Disconnectになっていること以外は全く同じだ。ネットワークプログラミングをしている感じがほとんどコードに現れず、何ならデータベースを扱っている感じすらしないというのが良い感じではないか。

前回のPythonの例と比較すると、言語の特徴が見られて面白い。まず目に付くのは、Rubyだとリソース解放を自前で書かなきゃならないことだ。それを確実になすには、begin-ensure構文を使うことになる。Pythonだと参照が切れた時に発動するデストラクタが確実なリソース解放を担うが、RubyJavaでデストラクタやファイナライザに頼るわけにはいかないので、明示的に書く必要がある。

配列や連想配列を扱うためのインターフェイスである = 演算子のエラー処理方法が違うのも興味深い。Pythonでは、該当レコードがない場合には例外を投げることになっているが、Rubyでは静かにnilを返すことになっている。

実装で悩んだのは、例によって双方向ストリームを使った対話型のイテレータの実装においてだ。gRPCの層では、リクエストを送信するためのイテレータを渡すと、レスポンスを受け取るためのイテレータを返すというAPIになっている。それをAPI呼び出し毎にリクエストとレスポンスのやり取りがなされる対話型のインターフェイスに直さねばならない。

output_iterator = stub.iterate(input_iterator)

つまり、リクエスト用のイテレータとレスポンス用のイテレータを同期させて動作させねばならない。次のAPIの呼び出しを待ち、それに基づいたリクエストをリクエスイテレータ経由で送信し、そのレスポンスをレスポンスイテレータ経由で受け取るのだ。そのためには、リクエスイテレータAPIの呼び出しイベントを待ち、API呼び出しはリクエスイテレータにそれを通知するという機構が必要になる。Pythonの場合はthreading.Eventというクラスを使ってそれができたが、Rubyには(標準パッケージでは)それに対応するものが見つからなかったので、自分で書いた。

# イベントの待機と通知のためのクラス
class Event
  # 初期化。初期状態ではイベントは未発行
  def initialize
    @mutex = Mutex.new
    @cond = ConditionVariable.new
    @is_set = false
  end
  # イベントを発行し、その旨を通知する
  def set
    @mutex.synchronize {
      @is_set = true
    }
    @cond.signal
  end
  # イベントを受け取った側では、これを読んでフラグをクリアする
  def clear
    @mutex.synchronize {
      @is_set = false
    }
  end
  # イベントを待つ
  # 既にイベントが発行されていても、それを検出する
  def wait
    @mutex.synchronize {
      while not @is_set do
        @cond.wait(@mutex)          
      end
    }
  end
end

API呼び出しはイベントのsetを呼び、リクエスイテレータはそれをwaitする。ここで注意していただきたいのが、先にsetが呼び出されていてもwaitはそれを検出できるということだ。この点が普通の条件変数との重要な差異だ。これを使うと、リクエスイテレータは以下のように実装できる。

# リクエストを次々と返すイテレータ
class RequestIterator
  attr_reader :event, :request
  attr_writer :event, :request
  # 初期化。リクエストはnil
  def initialize
    @event = Event.new
    @request = nil
  end
  # イベントが起きた際に、リクエストがnilでなければ、それを出力してyieldする
  # イベントが起きた際に、リクエストがnilなら、ループから抜けて終了
  def each_item
    return enum_for(:each_item) unless block_given?
    loop {
      @event.wait
      @event.clear
      if @request
        yield @request
      end
    }
  end
end

あとは、データベースのイテレータAPIは以下のように実装できる。初期化時にリクエスイテレータを作り、それをgRPCのスタブに渡してレスポンスイテレータを得る。各APIでは、リクエスイテレータにリクエストを設定してからイベントを発行し、その後にレスポンスイテレータからレスポンスを一つ取り出す。実際の実装はtkrzw_rpc.rbをご覧いただきたい。

class Iterator
  # リクエストイテレータとレスポンスイテレータを作る
  def initialize(dbm)
    if not dbm.channel
      raise StatusException.new(Status.new(Status.PRECONDITION_ERROR, "not opened connection"))
    end
    @dbm = dbm
    @req_it = RequestIterator.new
    begin
      @res_it = dbm.stub.iterate(@req_it.each_item)
    rescue GRPC::BadStatus
      @dbm = None
      @req_it = None
    end
  end
  # カーソルをデータベースの最初に移動させるAPI
  def first
    # リクエストを作る
    request = IterateRequest.new
    request.dbm_index = @dbm.dbm_index
    request.operation = IterateRequest::OpType::OP_FIRST
    begin
     # リクエストイテレータにリクエストを設定し、イベントを通知
      @req_it.request = request
      @req_it.event.set
      # レスポンスイテレータからそのレスポンスを取得
      response = @res_it.next
      @req_it.request = nil
    rescue GRPC::BadStatus => error
      return Status.new(Status::NETWORK_ERROR, str_grpc_error(error))
    end
    # レスポンスを返す
    return make_status_from_proto(response.status)
  end
  ...
end

あと、チャンネルを明示的に閉じる方法にもちょっと悩んだ。gRPCのチュートリアルだと、チャンネルを作成せずにスタブをいきなり作成するようになっている。これだとチャンネルオブジェクトが手に入らないので、チャンネルが閉じられないのだ。

stub = DBMService::Stub.new(address, :this_channel_is_insecure)

よって、チャンネルオブジェクトを先に作っておいて、それをスタブのコンストラクタに渡すことになる。私が探した限り、この方法以外でチャンネルを閉じる方法が見つからなかった。まともなプログラマなら、開いたチャンネルは閉じたいと思うはずなので、チュートリアルにその方法が解説されていないのはちょっと不思議だ。

# チャンネルとスタブを作る
channel = GRPC::ClientStub.setup_channel(nil, address, :this_channel_is_insecure)
stub = DBMService::Stub.new(address, :this_channel_is_insecure,
                            channel_override: channel)

# 使い終わったらチャンネルを閉じる
channel.close

もう一個細かい点だが、tkrzw_rpcというパッケージ名のprotoからRubyのコードを生成すると、そのモジュール名は「TkrzwRpc」になる。RPCは頭字語なので、「Rpc」ではなく「RPC」であってほしいところだが、その旨を指定することができない。仕方ないので、出力されたRubyコードにsedで検索置換をかけるという荒業を使っている。

実装上で悩んだのはこれくらいだ。残りの作業はほとんどコピペで済んだので、2日ほどでRubyクライアントライブラリの実装ができた。gRPCの生産性がここに活かされた形だ。