豪鬼メモ

一瞬千撃

DBMでメッセージキューイング その1 メッセージキューサービスの利用法

前回の記事で、メッセージキューを実現するための要素技術について論じた。今回は、それを実際にTkrzw-RPCのサーバに組み込んで、分散処理を行うデモを紹介する。Go、PythonRubyが一気に出てくるお得な記事だ。
f:id:fridaynight:20211021134932p:plain


1から順に増加する整数の値を1秒毎に生成するプログラムを書くとしよう。生成した値は、逐次、メッセージキューに入れられる。メッセージキューを監視する別のプログラムがあり、それは、取り出した値を素因数分解した結果を結果用のメッセージキューに書き込む。さらに別のプログラムが結果用のメッセージキューを監視して、それを逐次印字する。タスクを処理するためのキューと、タスクの結果を受け取るためのキューの2本を使っているところがちょっと面白いところだ。

整数を生成するプログラムはGo言語で実装しようか。generate.goは以下のような実装になる。

package main

import (
  "fmt"
  "github.com/estraier/tkrzw-rpc-go"
  "time"
)

func main() {
  // データベースに接続
  dbm := tkrzw_rpc.NewRemoteDBM()
  dbm.Connect("localhost:1978", -1, "").OrDie()
  // 1000までの整数を生成するループ
  for i := 1; i <= 1000; i++ {
    fmt.Printf("Processing: %d\n", i)
    // 最後の要素としてデータを格納し、通知を発送
    dbm.PushLast(tkrzw_rpc.ToString(i), -1, true).OrDie()
    // 1秒待つ
    time.Sleep(1000 * time.Millisecond)
  }
  // 接続を切る
  dbm.Disconnect().OrDie()
}

PushLastメソッドは、タイムスタンプから固定長のキーを自動生成するので、そのレコードを順序付きのデータベースに入れれば、必ず最後の位置に置かれる。万が一、同一タイムスタンプのレコードが存在していても、キーを再生成して重複回避を行ってくれる。また、第三パラメータを真にすると、通知シグナルが送信される。

この場合、整数を素因数分解するというタスクを分散して実行していることになる。生成された個々の整数がタスクの指示だ。タスクを読み込んで実行するプログラムはPythonで実装しようか。factorize.pyは以下のような実装になる。

import tkrzw_rpc

# 素因数分解の関数
def Factorize(num):
  factors = []
  for div in range(2, num + 1):
    while (num % div) == 0:
      factors.append(div)
      num //= div
  return factors

# データベースに接続
dbm = tkrzw_rpc.RemoteDBM()
dbm.Connect("localhost:1978").OrDie()

# 無限ループ
while True:
  # 操作対象を0番データベースに設定
  dbm.SetDBMIndex(0)
  # 5秒でタイムアウトさせつつメッセージを得る
  record = dbm.PopFirstStr(5)
  if not record: continue
  num = int(record[1])
  # 素因数分解を行う
  print("Processing: " + str(num))
  factors = Factorize(num)
  # 操作対象を1番データベースに設定
  dbm.SetDBMIndex(1)
  # 結果をメッセージとして書き込む
  # 最後の要素としてデータを格納し、通知を発送
  expr = str(num) + " = " + str(factors)
  record = dbm.PushLast(expr, -1, True).OrDie()

# 接続を切る。上が無限ループなので実行されないけど
dbm.Disconnect().OrDie()

PopFirstメソッドは、キューからタスク指示を読み込むと同時に、そのレコードを削除する。読み込みと削除はアトミックに行われるので、複数のプロセスが同時にキューに接続しても競合が起きない。また、パラメータに正の実数を与えた場合、取得失敗時に指定時間(秒単位)だけ待機する。その間に結果が得られれば即座にそれを返し、結果が期間内に来なければ、タイムアウトのエラーを返す。そうして得た整数を素因数分解した結果は、PushLastメソッドで結果用のキューに入れられる。

タスクを読み込んで実行するプログラムをRubyでも実装してみよう。factorize.rbは以下のような実装になる。

require "tkrzw_rpc"

# 素因数分解の関数
def factorize(num)
  factors = []
  (2..num).each do |div|
    while num % div == 0
      factors.push(div)
      num /= div
    end
  end
  factors
end

# データベースに接続
dbm = TkrzwRPC::RemoteDBM.new
dbm.connect("localhost:1978").or_die

# 無限ループ
while true
  # 操作対象を0番データベースに設定
  dbm.set_dbm_index(0)
  # 5秒でタイムアウトさせつつメッセージを得る
  record = dbm.pop_first(5)
  next if not record
  num = record[1].to_i
  # 素因数分解を行う
  puts("Processing: " + num.to_s)
  factors = factorize(num)
  # 操作対象を1番データベースに設定
  dbm.set_dbm_index(1)
  # 結果をメッセージとして書き込む
  # 最後の要素としてデータを格納し、通知を発送
  expr = num.to_s + " = " + factors.to_s
  record = dbm.push_last(expr, -1, true).or_die
end

# 接続を切る。上が無限ループなので実行されないけど
dbm.disconnect.or_die

タスクの結果は2番目のデータベースに格納される。それをリアルタイムに読み込んで、端末に印字するプログラムをGoで実装しよう。print.goは以下のような実装になる。

package main

import (
  "fmt"
  "github.com/estraier/tkrzw-rpc-go"
)

func main() {
  // データベースに接続
  dbm := tkrzw_rpc.NewRemoteDBM()
  dbm.Connect("localhost:1978", -1, "").OrDie()
  // 操作対象を1番データベースに設定
  dbm.SetDBMIndex(1)
  // 無限ループ
  for {
    // 最初の要素を文字列として取り出す
    _, value, status := dbm.PopFirstStr(5)
    if status.GetCode() == tkrzw_rpc.StatusSuccess {
      // 成功したなら、印字する
      fmt.Println(value)
    }
  }
  // 接続を切る。上が無限ループなので実行されないけど
  dbm.Disconnect().OrDie()
}

さて、これらを組み合わせて実行してみよう。まずはサーバを立てる。順序付きのファイルデータベースであるTreeDBMを使おう。タスク指示の整数を入れるためのデータベースはintegers.tktで、素因数分解の結果を入れるためのデータベースはfactors.tktだ。非同期APIモードにして、デバッグログも表示する。

$ tkrzw_server --log_level debug --async integers.tkt factors.tkt

別の端末で、generate.goを実行する。整数がタスクとしてintegers.tktに溜まっていく。サーバのログもご覧頂きたい。

$ go get
$ go run generate.go

さらに別の端末を二つ開いて、factorize.pyとfactorize.rbをそれぞれ実行する。溜まっていたタスクが一気に処理されるとともに、投入されたタスクがリアルタイムで処理されているのがわかる。結果はfactors.tktに溜まっていく。

$ python3 factorize.py
$ ruby factorize.rb

さらに別の端末を開いて、print.goを実行する。溜まっていた結果が一気に表示されるとともに、投入されてリアルタイムに処理されたタスクの結果がリアルタイムに表示されていることがわかる。

$ go get
$ go run print.go
...
252 = [2, 2, 3, 3, 7]
253 = [11, 23]
254 = [2, 127]
255 = [3, 5, 17]
256 = [2, 2, 2, 2, 2, 2, 2, 2]
257 = [257]
258 = [2, 3, 43]
259 = [7, 37]
260 = [2, 2, 5, 13]
261 = [3, 3, 29]
...

generator.goを実行している端末と、print.goを実行している端末を並べて眺めてほしい。タスクが投入されるやいなや、ほとんど遅延なく、結果が表示されているはずだ。これがリアルタイム性というやつだ。計算機とネットワークに可能な限界の速度で処理を行っていて、無駄な待ち時間は一切ない。

このように、タスクを分散処理しつつ、その結果をリアルタイムに得たい場合に、メッセージキューが活躍する。バッチジョブとして大量のタスクを捌きたいならばMapReduceやFlumeといったフレームワークを使うべきなのだが、投入したタスクの結果をできるだけ早く得たいのならば、メッセージキューを使うべきだ。言い換えると、スループット重視ならMapReduce、レイテンシ重視ならメッセージキューということだ。

実際のところ、リアルタイムに大量の数値計算をすることはあまりないだろう。事前計算をするなら、MapReduceを使うほうがよい。一方、メッセージキューは基本的にはオンラインシステムの負荷を制御するために使われる。例えば、画像処理や言語処理などの数秒かかるような重い処理をオンラインで提供するWebサービスがあるとしよう。もしWebサーバがバックエンドを直接叩くとすると、多数のクエリを同時に捌く負荷がバックエンドにかかってしまうし、負荷が許容範囲を超えると全てのクエリがタイムアウトする事態に陥る。メッセージキューを使うなら、バックエンドは自らの最大スループットのペースでタスクを取り出して処理が行える。Webサーバは、クエリを発行した後、結果のデータベースを監視するだけで良い。一定時間以内に結果が帰ってくればそれをクライアントに返し、そうでなければエラーをクライアントに通知する。

バックエンドのマシンを増やしてスループットを上げるのが分散処理の目的だ。一方で、メッセージキューは、限られたスループットを超えた場合に、キューにタスクを貯めることで、負荷を一定に保つ仕組みだ。それによってスループットが上がるわけではないが、時間的に負荷を分散させる、いわば「時間分散」の効果がある。オンラインサービスはオフタイムとピークタイムの負荷の差が激しいので、重そうな処理はメッセージキューを挟むように設計しておくと幸せになれる。

メッセージキューがサービスになっていることで、柔軟にシステムを組めるのも美点だ。この例だと、generator.goを足してタスク投入を並列化してもいいし、factorize.pyやfactorize.rbを増やしてタスク処理の並列性を高めてもいいし、printer.goを増やして結果を分散処理してもいい。各コンポーネントはどんな言語で書いても良い。タスクの指示や結果が複雑なデータ構造である場合、ProtobufやJSONで表現すると良いだろう。


メッセージキューサービスの実装について軽く解説したい。メッセージキューの最も重要な性能要件は、レイテンシを低くすることである。言い換えれば、投入されたメッセージができるだけ早く読み出されることだ。そのため、条件変数と通知を使った更新監視機能が必須となる。前回の記事のおさらいになるが、以下のフローになる。

  • 読み出し側のスレッドは、キューを調べ、メッセージがあれば即座に返す。
  • メッセージがない場合、待機状態になる。
  • 書き込み側のスレッドは、書き込みリクエストがあり次第、それをキューに入れ、通知を投げる。
  • 待機中の読み出し側のスレッドは通知によって即座に起こされ、キューにあるメッセージを返す。

これをgRPCのフレームワークに入れ込む。書き込み側のスレッドは、書き込みの後に通知を送るだけなので、特に難しくない。問題は、待機が必要な読み込み側だ。gRPCには同期APIと非同期APIがあり、Tkrzw-RPCのサーバは両方をサポートしている。同期APIでは単一のクエリがスレッドを専有するので、待機処理をそのまま書けば良い。疑似コードで書くなら以下のようになる。実際のコードはこちら

// 締め切り時間の算出
deadline = GetWallTime() + request.wait_time;
// 締め切り時間までループ
while (GetWallTime() < deadline) {
  // シグナルブローカに共有ロックをかけて、排他ロック内で行われる通知をブロック
  // Waiterクラスは、共有ロックを所有し、生成時にロックをかけ、破壊時に外す
  Waiter waiter(broker)
  // キューからメッセージを取り出す
  message = queue.GetMessage();
  // メッセージが取り出せたなら、それを出力する
  if (message != null) {
    EmitMessage(message);
    break;
  }
  // メッセージが取り出せなかったなら、共有ロックを外して、通知を受け取るまで待機
  // WaiterクラスのWaitメソッドは、待機状態を開始してから、共有ロックを外す
  wait_time = deadline - GetWallTime();
  waiter.Wait(wait_time);
}

上記をそのまま非同期APIに使うわけにはいかない。非同期APIでは単一スレッドで複数のクエリをタイムスライスしながら処理するので、あるクエリの処理がブロックすると、他のクエリの処理もブロックされてしまう。読み出しクエリが書き込みクエリをブロックした場合、デッドロックになる。よって、非同期APIの場合はバックグラウンドのスレッドを立てて待機処理を行う。しかし、キューにメッセージが大量に溜まっている場合には、わざわざメッセージ毎に待機用のスレッドを立てるのは無駄だ。よって、キューを調べてメッセージがあった場合には、それを読み込んで返し、そうでない場合にのみ、待機用のスレッドを立てるという実装にすべきだ。キューが空ということはCPUが暇だということなので、スレッドを立てても負荷の問題はない。スレートマシンの疑似コードを以下に示す。実際のコードはこちら

if (proc_state_ == CREATE) {
  // CREATE状態ならば、PROCESS状態に遷移しつつ、リクエストを取得する処理をキューに入れる
  proc_state_ = PROCESS;
  queue_.GetRequest(&context_, &request_, &responder_, queue_, queue_, this);
} else if (proc_state_ == PROCESS) {
  // リクエストが取得できたら、ここに来る
  // 自分の子供を産卵して、同種の他のクエリが呼べるようにする
  SpawnChild(queue_);
  // キューからメッセージを取り出す
  message = queue.GetMessage();
  if (message == null) {
    // メッセージが無い場合、バックグラウンドで処理を行う。内容は同期APIとほぼ一緒
    auto task = [&]() { ... };
    bg_thread_ = std::thread(task);
  } else {
    // メッセージがある場合、終了状態に遷移しつつ、それをそのまま返す
    response_.message = message
    proc_state_ = FINISH;
    responder_.Finish(response_, this);
  }
} else {
  // 終了状態の場合、死ぬ
  delete this;
}

以上の工夫により、高スループットと低レイテンシを両立するメッセージキューが実装できた。ポーリングやビジーループを使っても待機処理の実装はできるのだが、ポーリングだとレイテンシが高くなり、ビジーループだとスループットが低くなるという問題がある。となると条件変数を使った監視処理が必須になる。gRPCの非同期APIで監視処理を実装するには、今回の方法が現実的だ。


まとめ。メッセージキューサービスの簡単な使い方と、機能要件と、サーバ側の実装の要点を説明した。単なるキューなのに、リアルタイム性とか言い出すと、かなり高度なスレッドプログラミングの知識が要求される。しかし、その面倒なところをメッセージキューサービスが請け負うことで、利用者であるフロントエンドとバックエンドの実装は単純化できる。フロントエンドは単にクエリをキューにぶっ込めばよいし、バックエンドは単にクエリを読み込んでから処理すればよい。フロントエンドとバックエンドでクエリとその結果をピンポンしたい場合、2本のメッセージキューを使えば良い。