豪鬼メモ

一瞬千撃

データベースからGo言語の関数を呼ぶ

コールバック関数を読んでデータベースの内容をアトミックに調べて更新する機能をGo言語版にもつけた。これでJava/Python/Rubyも含めた全言語でコールバックが使えるようになった。


TkrzwのGoバインディングは以前から存在していて、それはGo言語からC言語の関数を呼び出すように実装されていた。今回は、Goから読んだCの関数の中からGoの関数を呼ぶという構造になっている。これがなかなか面倒だったので、カラクリをメモしておく。

まずはサンプルコードで使い方を説明しておこう。DBMクラスのProcess系メソッドに関数を渡すと、Processなら指定した単一レコードを、ProcessMultiなら指定した複数レコードを、ProcessEachならデータベース全体をロックした状態で、関数を実行してくれる。関数のパラメータとしてレコードの現状の値が渡され、関数の戻り値がレコードの新たな値になる。

package main

import (
  "fmt"
  // こいつをimportしておくだけで、Tkrzwが使えるようになる
  "github.com/estraier/tkrzw-go"
  "regexp"
  "strings"
)

func main() {
  // データベースファイルを開く。
  dbm := tkrzw.NewDBM()
  dbm.Open("casket.tkh", true,
    tkrzw.ParseParams("truncate=true,num_buckets=100"))
  // 終了時に自動的にデータベースファイルが閉じるようにする。
  defer dbm.Close()

  // 無名関数を呼び出す方法で、データベースにレコードを追加する。
  dbm.Process("doc-1", func(k []byte, v []byte) interface{} {
    return "Tokyo is the capital city of Japan."
  }, true)
  dbm.Process("doc-2", func(k []byte, v []byte) interface{} {
    return "Is she living in Tokyo, Japan?"
  }, true)
  dbm.Process("doc-3", func(k []byte, v []byte) interface{} {
    return "She must leave Tokyo!"
  }, true)

  // レコードの値を小文字に変換する関数。
  lower := func(k []byte, v []byte) interface{} {
    // もし指定したレコードがなければ、nilが渡されるので、何もしない。
    if v == nil {
      return nil
    }
    // 新しい値を返せば、それが設定される。
    // 戻り値は文字列でも数値でもバイト列でも、勝手にバイト列になる。
    return strings.ToLower(string(v))
  }
  dbm.Process("doc-1", lower, true)
  dbm.Process("doc-2", lower, true)
  dbm.Process("doc-3", lower, true)
  dbm.Process("non-existent", lower, true)

  // キーと関数のペアのリストを与えて、複数のレコードを一括追加する。
  ops := []tkrzw.KeyProcPair{
    {"doc-4", func(k []byte, v []byte) interface{} { return "Tokyo Go!" }},
    {"doc-5", func(k []byte, v []byte) interface{} { return "Japan Go!" }},
  }
  dbm.ProcessMulti(ops, true)

  // 同様に既存のレコードの一括更新もできる。
  dbm.ProcessMulti([]tkrzw.KeyProcPair{{"doc-4", lower}, {"doc-5", lower}}, true)

  // 既存のレコードを全て調べる従来の方法。
  // チャンネルと外部イテレータを使うので、性能的に最適ではない。
  for record := range dbm.EachStr() {
    fmt.Println(record.Key, record.Value)
  }

  // ワードカウントを行う関数。
  wordCounts := make(map[string]int)
  wordSplitter := regexp.MustCompile("\\W")
  wordCounter := func(key []byte, value []byte) interface{} {
    if key == nil {
      return nil
    }
    words := wordSplitter.Split(string(value), -1)
    for _, word := range words {
      if len(word) == 0 {
        continue
      }
      wordCounts[word] += 1
    }
    return nil
  }

  // ワードカウント関数をデータベース全体に適用する。
  // 第2引数がfalseだと、更新できない代わりに、より高速になる。
  dbm.ProcessEach(wordCounter, false)
  for word, count := range wordCounts {
    fmt.Printf("%s = %d\n", word, count)
  }

  // コールバック関数の戻り値をRemoveBytesにすると、既存のレコードを消せる。
  dbm.Process("doc-1", func(k []byte, v []byte) interface{} {
    return tkrzw.RemoveBytes
  }, true)
  println(dbm.CountSimple())
  dbm.ProcessMulti([]tkrzw.KeyProcPair{
    {"doc-4", func(k []byte, v []byte) interface{} { return tkrzw.RemoveBytes }},
    {"doc-5", func(k []byte, v []byte) interface{} { return tkrzw.RemoveBytes }},
  }, true)
  println(dbm.CountSimple())
  dbm.ProcessEach(func(k []byte, v []byte) interface{} {
    return tkrzw.RemoveBytes
  }, true)
  println(dbm.CountSimple())
}

並列処理が得意なGo言語と、排他制御カプセル化したコールバック式データベース処理の相性はかなり良いはずだ。PythonRubyでも同等の機能があるが、ネイティブスレッドの並列性を活かせるGoやJavaでこそ真価が出せると思う。

さて、Go言語のコールバックの実装はJavaRubyPythonよりはるかに面倒くさかったというか、できないと思って諦めかけていた。cgoの規約においては、Goの関数ポインタをCの生ポインタにキャストすることが禁じられているからだ。GCの関係で関数や関連するリソースのアドレスが一定である保証がないからだそうな。しかし、調べてみると、mattn/go-pointerというパッケージでうまいことやっているのを見つけた。コードを読んだところ、こんな感じの動作っぽい。

  • Save : 呼び出すべきGoの関数に対応したCのダミーのメモリ領域を作り、そのアドレスをキーにしたマップに双方を登録し、ダミー領域のアドレスを返す。
  • Unref : ダミー領域のアドレスを受けとり、マップ内のレコードとダミー領域のリソースを解放する。
  • Restore : ダミー領域のアドレスを受けとり、Goの関数を返す。

この機構があれば、C側にはダミー領域のアドレスだけを渡すことができるようになる。関数呼び出しのためにわざわざマップを操作したり、その操作をMutexでガードしなければいけないなどのオーバーヘッドはあるが、これがあればなんとかなりそうだ。こんな一休さんみたいな方法がベストプラクティスなのは俄には信じがたかったが、rationaleを読むと、そうせざるを得ないことに納得する。とはいえ、この辺はcgo自体に面倒見てほしいところだけども。

さて、Go言語において、関数定義の直前の行に "//export xxx" などと書いておくと、ビルド時にそれがC側から見えるようになる。これを使えば、静的な関数であれば、C側から簡単に実行できる。ならば、その静的な関数に上述のダミー領域のアドレスを渡して、その中でGo関数を復元して実行すれば、動的に渡された任意の関数が実行できるようになる。以上のことを既存のコードにマージすると、こんな感じになる。

// RecordProcessor型(func([]byte,[]byte)[]byte)の関数を管理するマップ。
type RecordProcessorPool struct {
  data  map[unsafe.Pointer]RecordProcessor
  mutex sync.Mutex
}

// 上記マップの配列。並列性向上のためにスロット化する。
var recordProcessorPools = []RecordProcessorPool{
  {data: make(map[unsafe.Pointer]RecordProcessor)},
  {data: make(map[unsafe.Pointer]RecordProcessor)},
  {data: make(map[unsafe.Pointer]RecordProcessor)},
}

// Go関数を登録してCのダミーポインタを返す。
func registerRecordProcessor(proc RecordProcessor) unsafe.Pointer {
  var up unsafe.Pointer = C.malloc(C.size_t(1))
  if up == nil {
    panic("memory allocation failed")
  }
  procPool := recordProcessorPools[int(uintptr(up) >> 3) % len(recordProcessorPools)]
  procPool.mutex.Lock()
  procPool.data[up] = proc
  procPool.mutex.Unlock()
  return up
}

// ダミーポインタを渡してリソースを開放する。
func deregisterRecordProcessor(up unsafe.Pointer) {
  procPool := recordProcessorPools[int(uintptr(up) >> 3) % len(recordProcessorPools)]
  procPool.mutex.Lock()
  delete(procPool.data, up)
  procPool.mutex.Unlock()
  C.free(up)
}

// ダミーポインタを渡して、Go関数を実行する。
//export callRecordProcessor
func callRecordProcessor(up unsafe.Pointer, keyPtr unsafe.Pointer, keySize C.int32_t,
  valuePtr unsafe.Pointer, valueSize C.int32_t) (unsafe.Pointer, int32) {
  // Go関数を取り出す。
  procPool := recordProcessorPools[int(uintptr(up) >> 3) % len(recordProcessorPools)]
  procPool.mutex.Lock()
  proc := procPool.data[up]
  procPool.mutex.Unlock()
  // Cの生ポインタからキーのGoバイト列を作成。
  var key []byte
  if keyPtr == nil {
    key = nil
  } else {
    key = C.GoBytes(keyPtr, keySize)
  }
  // Cの生ポインタから値のGoバイト列を作成。
  var value []byte
  if valuePtr == nil {
    value = nil
  } else {
    value = C.GoBytes(valuePtr, valueSize)
  }
  // キーと値を渡してGo関数を呼ぶ。
  rv := proc(key, value)
  // Go関数の戻り値をC側で処理しやすいようにCの生バイト列に変換。
  var retPtr unsafe.Pointer
  var retSize int32
  if IsNilData(rv) {
    retPtr = nil
    retSize = 0
  } else if IsRemoveData(rv) {
    retPtr = unsafe.Pointer(uintptr(1))
    retSize = 0
  } else {
    rv_bytes := ToByteArray(rv)
    // 呼び出しのC側でfreeすること!
    retPtr = C.CBytes(rv_bytes)
    retSize = int32(len(rv_bytes))
  }
  return retPtr, retSize
}

mapの部分がボトルネックにならないように、ダミーアドレスでハッシュしてから複数のスロット(マップとロックのペア)を使い分ける実装にしている。上記では3つのスロットを使っているが、1つでも7つでも127個でも動く。ダミーアドレスの下位3ビットを捨てた値をスロット数で割ってスロット割り当てている。アロケータの癖に左右されないように、スロット数は素数にしておくのが無難だろう。元のgo-pointerだとMutexの代わりにRWMutexを使っていて、Restore操作を共有ロックにして効率化を図っていたけど、衝突が少ない場合にはロック自体のオーバーヘッドが利いてくるので、MutexとRWMutexのどちらが得かは場合によりけりだ。たとえスロット数が1つであっても、mapのサイズが激しく変わってリハッシュが頻繁に起こることは考えづらいので、mapの操作は一瞬で終わるはずだ。それをさらにスロット化しているので、たとえ並列数が高かったとしても、ロックはほとんど衝突しないだろう。となると、Mutexで十分だ。というか、3回の内1回を共有ロックにできるメリットよりは、フットプリントが小さいMutexでスロット数を増やせるメリットの方が大きい気がする。性能テストで比較したわけじゃないから確たることは言えないけれど。

Goのコールバックによるレコード更新は、そうでない普通のSetメソッドによる更新に比べてどのくらいのオーバーヘッドがあるのか、検証してみた。キーと値が各8バイトのレコードを100万個格納する操作のスループット(QPS)を測定した。

QPS
C++ Set 2,138,031
Go Set 685,871
Go Process 482,160
Go ProcessMulti 360,100

まず、C++のSetの約210万QPSに比べると、Goのそれは1/3ほどのスループットになる。それでも毎秒68万回の更新ができるのだから、たいていのユースケースで、ボトルネックになることはないだろう。同じ内容の更新をProcessメソッドでやると、スループットは48万QPSとなり、GoのSetに比べて70%ほどということになる。これはかなりいい数字だと思う。ProcessMultiにすると36万QPSなので、52%だ。ProcessMultiは実際には1回で複数のレコードを更新するものなので、実用上のスループットはもっと高いだろう。これらはかなりいい数字だと思う。毎回の呼び出しでMutexのロックとアンロックを3回ずつ、mapの操作も3回行っていることを考えれば、この程度の性能低下で済んでいるのは御の字だ。

まとめ。CからGoのコールバック関数を呼ぶ機構が実装できた。データベース操作の最強の柔軟性がここにある。久しぶりにGo言語を書いたが、Cをひたすら現代風にしてみましたって感じが伝わってきて、いいな。誰でも安全快適に走れる電動自転車の如き基本設計だけれども、本気を出す時はピストに変身して爆走できる的な。