豪鬼メモ

一瞬千撃

Tkrzw-RPCのGoクライアントライブラリ

PythonRubyのクライアントライブラリを書いたので、次はGoだ。gRPC界隈ではGoがよく使われるようなので、優先度高めで着手した。詳しいことはAPI文書をご覧いただきたいが、簡単な使い方をここでも紹介する。


他の言語と同じように、TkrzwのC++のコアライブラリには依存せずに、gRPCだけに依存するようにクライアントライブラリを書いた。よって、クライアント側ではTkrzwのコアライブラリをインストールする必要はない。とはいえ、動作確認するにはTkrzw-RPCのサーバが必要だ。なので、TkrzwとTkrzw-RPCは普通にmake, make installでインストールしておいていただく。Go moduleを使えば、クライアントライブラリのインストール作業は必要ない。インポート文を書くだけでgRPCとクライアントライブラリは勝手にインストールされる。

サンプルプログラムを実行する前に、Tkrzw-RPCのサーバを立てる。以下のコマンドを実行すればよい。デフォルトでは、ローカルホストの1978番ポートで、オンメモリデータベースをサーブする。デバッグログを出力して、どんな処理が行われているかをわかりやすくしておこう。

$ tkrzw_server --log_level debug

最も簡単なサンプルプログラムを示そう。Goのmapとほぼ同じように使えるのがセールスポイントだ。

package main

import (
  "fmt"
  // このインポート文だけ書けば勝手にインストールされて利用可能になる
  "github.com/estraier/tkrzw-rpc-go"
)

func main() {
  // データベースを準備する
  dbm := tkrzw_rpc.NewRemoteDBM()
  dbm.Connect("127.0.0.1:1978", -1)

  // レコードを書き込む
  // キーと値は暗黙的にバイト列に変換されて保存される
  dbm.Set("first", "hop", true)
  dbm.Set("second", "step", true)
  dbm.Set("third", "jump", true)

  // レコードを検索して、値を文字列として取り出す
  fmt.Println(dbm.GetStrSimple("first", "*"))
  fmt.Println(dbm.GetStrSimple("second", "*"))
  fmt.Println(dbm.GetStrSimple("third", "*"))

  // データベース内のレコードを横断的に取得する
  for record := range dbm.EachStr() {
    fmt.Println(record.Key, record.Value)
  }

  // 接続を閉じる
  dbm.Disconnect()
}

もうちょい複雑なサンプルも示そう。いちいちエラーチェックもやっている。CompareExchangeMultiを使って、アトミックなトランザクションを実現している。アリスのレコードの値を500減らして、ボブのレコードの値を500増やすという操作を、リモートからもアトミックに行える。

package main

import (
  "fmt"
  "github.com/estraier/tkrzw-rpc-go"
)

func main() {
  // データベースを準備する
  // タイムアウトを秒単位で指定できる
  // 各操作の成否はStatusとして返されるが、そのOrDieメソッドは、エラー時にのみ
  // panicを発生させる
  // 真面目なユースケースでは、任意のエラー処理を書くこと
  dbm := tkrzw_rpc.NewRemoteDBM()
  dbm.Connect("localhost:1978", 10).OrDie()

  // 遅延実行を使って接続を確実に閉じる
  defer func() { dbm.Disconnect().OrDie() }()

  // ボブとアリスの銀行口座を表現する
  // 預金額を10進数の文字列として表現する
  dbm.Set("Bob", 1000, false).OrDie()
  dbm.Set("Alice", 3000, false).OrDie()

  // 送金処理をアトミックに実現するトランザクション
  transfer := func(src_key string, dest_key string, amount int64) *tkrzw_rpc.Status {
    // 送金元と送金先の現在の値を調べる
    old_src_value := tkrzw_rpc.ToInt(dbm.GetStrSimple(src_key, "0"))
    old_dest_value := tkrzw_rpc.ToInt(dbm.GetStrSimple(dest_key, "0"))

    // 新しい値を計算する。送金元の金額を減らし、送金先の金額を増やす
    new_src_value := old_src_value - amount
    new_dest_value := old_dest_value + amount
    if new_src_value < 0 {
      return tkrzw_rpc.NewStatus(tkrzw_rpc.StatusApplicationError, "insufficient value")
    }

    // トランザクションの発行前と発行後の条件をスライスとして表現する
    old_records := []tkrzw_rpc.KeyValueStrPair{
      {src_key, tkrzw_rpc.ToString(old_src_value)},
      {dest_key, tkrzw_rpc.ToString(old_dest_value)},
    }
    new_records := []tkrzw_rpc.KeyValueStrPair{
      {src_key, tkrzw_rpc.ToString(new_src_value)},
      {dest_key, tkrzw_rpc.ToString(new_dest_value)},
    }

    // アトミックにトランザクションを実行する
    // 事前条件に合致した場合、つまり他のトランザクションが両口座の現在の預金額を
    // 変更していない場合、事後条件の値が設定される
    // 事前条件が満たされない場合、失敗する
    return dbm.CompareExchangeMultiStr(old_records, new_records)
  }

  // トランザクションを成功するまで試行する
  var status *tkrzw_rpc.Status
  for num_tries := 0; num_tries < 100; num_tries++ {
    status = transfer("Alice", "Bob", 500)
    if !status.Equals(tkrzw_rpc.StatusInfeasibleError) {
      break
    }
  }
  status.OrDie()

  // イテレータを使って、全ての口座の情報を印字する
  iter := dbm.MakeIterator()
  defer iter.Destruct()
  iter.First()
  for {
    key, value, status := iter.GetStr()
    if !status.IsOK() {
      break
    }
    fmt.Println(key, value)
    iter.Next()
  }
}

そんなわけで、TkrzwのローカルのAPIとほぼ同じ使用感になっているし、実際、Open/Closeの代わりにConnect/Disconnectになっていること以外は全く同じだ。ネットワークプログラミングをしている感じがほとんどコードに現れず、何ならデータベースを扱っている感じすらしないというのが良い感じではないか。

PythonRubyに比べると、gRPCの実装はだいぶやりやすかった。特にストリームの実装が楽だった。PythonRubyではイテレータをストリームに渡すAPIになっているのだが、それをインタラクティブに扱うのが本当に面倒くさいのだ。Goの場合はストリームの入出力処理を明示的に書けるので、楽だった。

ところで、gRPCのクライアントライブラリは、接続時にチャンネルを作っただけではエラーチェックをしない。これはGoに限らず、C++でもPythonでもRubyでも一緒だ。サーバが死んていた場合には、最初のRPCコールを送った時にそれが検出される。多くの場合、それは望ましい動作ではない。言語によっては、チャンネル生成時に接続が確立するまでブロックするというオプションがあるが、それはエラー時の再試行を延々と行ってくれるため、サーバが死んでいることをいつまでも検出できない。この挙動も、大抵のオンラインシステムでは許容できないだろう。

しかし、チャンネルの状態遷移を監視する機能が提供されているので、それを使うと、サーバのエラーを即座に検出できる。具体的居は、以下のようなコードである。

// チャンネルを作る
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
  return NewStatus2(StatusNetworkError, strGRPCError(err))
}
// 0.1秒毎に3回だけ状態遷移を監視し、それでも接続できなければ抜ける
maxFailures := 3
numFailures := 0
for {
  // 現在の状態を見て、接続できていれば抜ける
  state := conn.GetState()
  if state == connectivity.Ready {
    break
  }
  if state == connectivity.Idle && numFailures > 0 {
    // Idle状態で、かつ一度でもTransientFailureになったことがあれば、エラーとみなし
    // エラーの回数をカウント
    numFailures += 1
  } else if state == connectivity.TransientFailure {
    // TransientFailureは回復可能かもしれないエラー
    // エラーの回数をカウント
    numFailures += 1
  } else if state == connectivity.Shutdown {
    // Shutdownだともう回復不能
    numFailures = maxFailures
  }
  // エラー回数が規定を超えた場合、あきらめる
  if numFailures >= maxFailures {
    conn.Close()
    return NewStatus2(StatusNetworkError, "connection failed")
  }
  // チャンネルが次の状態に移るまで最大0.1秒待機
  ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
  defer cancel()
  conn.WaitForStateChange(ctx, state)
}

他の言語でもWaitForStateChangeに類する機能はあるのだが、GoのそれはまだExperimentalらしい。それもあるのかもしれないが、状態遷移の仕方がが他の言語と違うことには注意せねばならない。C++PythonRubyでは、サーバが死んでいる場合には、ずっとTransientFailure状態になるのだが、Goだけは、一度TransientFailureになってから、なぜかIdleに遷移する。これはバグなんじゃないかと疑ってはいるが、確証はない。上記のコードでは、この挙動を踏まえた処理を書いているが、正直奇妙だなと思う。


まとめ。Goのクライアントも実装できたので、とりあえずTkrzw-RPCでやりたいことはだいたい出来た感じだ。C++PythonRubyとGoのファイルを同時に開いて同じ処理を書くという修行をしていたので、各言語の特徴が把握できて面白かった。次回は簡単な性能比較を行いたい。