豪鬼メモ

一瞬千撃

Tkrzw-RPCのPythonクライアントライブラリ

DBサービスを作ろうの一連の連載が終了し、無事DBサービスが完成したので、クライアントライブラリを作る旅に出ている。とりあえずPython版ができたので、使い勝手などを説明したい。Tkrzw-RPC本体については、スライドの方をご覧頂きたい。


Tkrzw-RPCのサーバはC++で実装されているが、gRPCを使っているおかげで各種言語のクライアントライブラリを対応するのも楽だ。もっと手抜きして、C++のクライアントライブラリを各言語から呼ぶという案もあったのだが、そうすると後で移植性に苦労することになるので、やはり各言語のgRPCライブラリのみを使って実装することにした。

私が実用レベルで書いたことのある言語はC/C++の他にはJavaPythonRubyとGoとJavaScriptくらいだが、その中のどれを自分で書いてメンテするかにまず悩む。使用者の数から言えばJavaなんだろうけど、gRPCのエコシステムの中ではGoの存在感が結構強いので、そこから始めるのもありかと思った。とかいいつつ、とりあえず楽に書き始められそうだったPythonから手を付けることにした。

能書きはさておき、導入方法を説明しよう。TkrzwとTkrzw-RPCは普通にmake, make installでインストールしておいていただく。gRPCのPythonライブラリは本家のページに書いてあるようにpipでインストールするのが楽だ。

$ sudo python -m pip install grpcio

Tkrzw-RPC-Pythonのインストールも普通にmake, make installでやればOK。configureは不要。

$ tar zxvf tkrzw-rpc-python-0.1.2.tar.gz
$ cd tkrzw-rpc-python-0.1.2
$ make
$ sudo make install

サンプルプログラムを実行する前に、Tkrzw-RPCのサーバを立てる。以下のコマンドを実行すればよい。デフォルトでは、ローカルホストの1978番ポートで、オンメモリデータベースをサーブする。デバッグログを出力して、どんな処理が行われているかをわかりやすくしておこう。

$ tkrzw_server --log_level debug

最も簡単なサンプルプログラムを示そう。PythonのDictとほぼ同じように使えるのがセールスポイントだ。

# Tkrzw-RPCのパッケージをインポートする
import tkrzw_rpc

# データベースを準備する
dbm = tkrzw_rpc.RemoteDBM()
dbm.Connect("localhost:1978")

# レコードを追加する
# もしエラーがあれば例外で通知される
# 実際のデータはバイト列として記録されるが、文字列のキーや値を渡してもOK
dbm["first"] = "hop"
dbm["second"] = "step"
dbm["third"] = "jump"

# レコードを検索する
# もしエラーがあれば例外で通知される。該当レコードがないも例外になる
# キーが文字列なら、結果も文字列で返される。そうでなければ、バイト列になる
print(dbm["first"])
print(dbm["second"])
print(dbm["third"])
try:
    print(dbm["fourth"])
except tkrzw_rpc.StatusException as e:
    print(repr(e))

# 横断的に全レコードを操作する
# 結果はバイト列なので、表示する際に文字列に直している
for key, value in dbm:
    print(key.decode(), value.decode())

# 接続を切断する
dbm.Disconnect()

もうちょい複雑なサンプルも示そう。いちいちエラーチェックもやっている。

import tkrzw_rpc

# データベースを準備する
# タイムアウトは秒単位で設定する
dbm = tkrzw_rpc.RemoteDBM()
status = dbm.Connect("localhost:1978", timeout=10)
if not status.IsOK():
    raise tkrzw_rpc.StatusException(status)

# 単一サーバで複数データベースを運用している場合、その操作対象を指定できる
# デフォルトは0で、最初のデータベースが対象となっている
dbm.SetDBMIndex(0).OrDie()

# レコードを追加する
# Set等の多くのメソッドはStatusオブジェクトを返すが、そのOrDieメソッドは、
# 成功状態ではない場合に自身を例外にして投げる
dbm.Set(1, "hop").OrDie()
dbm.Set(2, "step").OrDie()
dbm.Set(3, "jump").OrDie()

# エラーチェックなしにレコードを検索する
# 検索が失敗した場合にはNoneが返される
print(dbm.GetStr(1))
print(dbm.GetStr(2))
print(dbm.GetStr(3))
print(dbm.GetStr(4))

# 例外を使わずにエラーチェックをしたい場合、statusオブジェクトを作って渡す
# statusオブジェクトとステータスコードは直接比較できる
status = tkrzw_rpc.Status()
value = dbm.GetStr(1, status)
print("status: " + str(status))
if status == tkrzw_rpc.Status.SUCCESS:
    print("value: " + value)

# データベースを再構築する
dbm.Rebuild().OrDie()

# 横断的に全レコードにアクセスする
iter = dbm.MakeIterator()
iter.First()
while True:
    status = tkrzw_rpc.Status()
    record = iter.GetStr(status)
    if not status.IsOK():
        break
    print(record[0], record[1])
    iter.Next()

# 接続を閉じる
dbm.Disconnect().OrDie()

そんなわけで、TkrzwのローカルのAPIとほぼ同じ使用感になっている。実際、Open/Closeの代わりにConnect/Disconnectを呼ぶこと以外は全く同じだ。ネットワークプログラミングをしている感じがほとんどコードに現れず、データベースを扱っている雰囲気が全くないというのが良い感じではないか。私の理想がここに結実した。詳細についてはAPI文書をご覧いただきたい。

データベース内の個々のレコードを参照するイテレータは、gRPCの双方向ストリームを使って実装されている。その対応が面倒だったので、それだけメモしておこう。PythonのgRPCライブラリでは、クライアントからサーバへのストリームを表現するのにイテレータを用いる必要がある。サーバからクライアントに返されるストリームもイテレータとして表現される。よって、双方向ストリームのAPIは、入力用のイテレータを渡して出力用のイテレータを受け取るというシグネチャになる。IterateというメソッドのAPIはこんな感じだ。

output_iterator = stub.Iterate(input_iterator)

さて、リクエスト一つを入力するとレスポンスが一つ出てくるというピンポン方式の駆動をするとして、入力がAPIの呼び出しであり、それに対応する出力を返却するというのが今回のユースケースだ。典型的には以下の感じで、First、Get、Nextに対応する3回のピンポンが行われる。

it = dbm.MakeIterator();
it.First()
record = it.Get()
print(record)
it.Next()

入力用のイテレータを作った時には入力内容が決まっておらず、そして入力を与える度にそれに対応する出力を取り出すという手順を実装しなければならない。結論としては、Threading.Eventを使って次の入力を待ってから結果を返す機能を入力イテレータに持たせる必要がある。そして、イテレータAPIの各呼び出しで、リクエストを入力イテレータに格納してから、それを通知する。RemoteDBMのイテレータクラスの実装は以下のようになる。

class Iterator:

  # コンストラクタ
  def __init__(self, dbm):
    self.dbm = dbm
    # 入力イテレータを作る
    class RequestIterator():
      def __init__(self):
        self.request = None
        self.event = threading.Event()
      def __next__(self):
        # 次の入力を待つ
        self.event.wait()
        self.event.clear()
        # リクエストがあれば返す
        if self.request:
          return self.request
        # なければセッションを終了する
        raise StopIteration
    self.req_it = RequestIterator()
    # 入力イテレータを渡して、出力イテレータを得る
    try:
      self.res_it = dbm.stub.Iterate(self.req_it)
    except grpc.RpcError as error:
      self.dbm = None
      self.req_it = None

  # 最初のレコードに飛ぶ
  def First(self):
    # リクエストを作る
    request = tkrzw_rpc_pb2.IterateRequest()
    request.dbm_index = self.dbm.dbm_index
    request.operation = tkrzw_rpc_pb2.IterateRequest.OP_FIRST
    try:
      # 入力イテレータにリクエストを入れてから通知する
      self.req_it.request = request
      self.req_it.event.set()
      # 出力イテレータからレスポンスを取り出す
      response = self.res_it.__next__()
      self.req_it.request = None
    except grpc.RpcError as error:
      return Status(Status.NETWORK_ERROR, _StrGRPCError(error))
    # 結果を返す
    return _MakeStatusFromProto(response.status)

実装上で悩んだのはこれと、接続時のエラー検出くらいだ。残りの作業はほとんどコピペで済んだので、2日ほどでPythonクライアントライブラリの実装ができた。gRPCの生産性がここに活かされた形だ。