豪鬼メモ

一瞬千撃

Python/Rubyの非同期APIとコルーチン

PythonRubyではマルチスレッドを使うと性能がむしろ悪化する。そんな話を以前の記事で書いた。グラフに現れているように、1スレッドだとGoと同じくらいの性能なのに、マルチスレッドにするとスループットが激減してしまうのだ。ところで、ゲームやその他の対話的なアプリケーションでは、並行処理(並列処理でなく)のためにコルーチンとかファイバーとか呼ばれる機構を使うことが多い。そこでの利便性を考えて、Tkrzwでも非同期APIを実装してみた。
f:id:fridaynight:20210724153934p:plain


Pythonの非同期APIは、以下のように使うことができる。AsyncDBMというアダプタを介してAPIを呼ぶと、各メソッドはFutureというオブジェクトを返す。実際の処理はスレッドプールで管理される別のネイティブスレッドで行われる。結果はFutureオブジェクトのGetメソッドを使って取り出すことができる。Futureクラスはawaitableプロトコルを実装しているので、コルーチンの中ではawait文を使って待機することもできる。await文を使うと、待機中に別のコルーチンを進捗させられるので、全体のスループットを上げることができる、とされる。

import asyncio
import tkrzw

async def main():
  # データベースを開く
  dbm = tkrzw.DBM()
  dbm.Open("casket.tkh", True, truncate=True, num_buckets=100)

  # 4つのワーカスレッドを内部に持つ非同期APIアダプタを準備する
  adbm = tkrzw.AsyncDBM(dbm, 4)

  # Setメソッドを非同期に実行する。
  future = adbm.Set("hello", "world")
  # フォアグラウンドで別の処理を行う。
  print("Setting a record")
  # Setメソッドの終了を待ち、その結果を取得する。
  # FutureのGetメソッドを呼ぶと該当操作の終了を待つが、コルーチンの一時停止はしない
  status = future.Get()
  if status != tkrzw.Status.SUCCESS:
    print("ERROR: " + str(status))

  # Getメソッドを非同期に実行する。
  future = adbm.GetStr("hello")
  # フォアグラウンドで別の処理を行う。
  print("Getting a record")
  # 現在のコルーチンを一時停止しつつ、該当操作の終了を待つ
  await future
  # 結果を取り出す。DBMのGetStrメソッドに対応するのはステータスと値の文字列のペア
  status, value = future.Get()
  if status == tkrzw.Status.SUCCESS:
    print("VALUE: " + value)

  # 非同期APIアダプタを破棄する
  adbm.Destruct()

  # データベースを閉じる
  dbm.Close()

asyncio.run(main())

TkrzwのAPIはコルーチンとは独立で使えるので、必ずしもコルーチンのイベントループ内で使わなければいけないわけではない。その場合には、await文の代わりにFutureのWaitメソッドを呼ぶか、そもそもいきなりGetを呼べばよい。await文で複数のコルーチンを切り替えて並行実行したい場合には、そうすることもできるというだけである。

Pythonにおいて、わざわざasyncやawaitといった新しい予約語を追加して後方互換性を一部破壊してまでも非同期コルーチン機構を導入したのは、それなりの理由がある。要は、スレッドの利便性が低いということなのだ。スレッドでなくコルーチンを使えというメッセージがひしひしと伝わってくる。


Rubyの非同期APIは、以下のように使うことができる。構造的にはPythonと全く同じだ。AsyncDBMというアダプタを介してAPIを呼ぶと、各メソッドはFutureというオブジェクトを返す。実際の処理はスレッドプールで管理される別のネイティブスレッドで行われる。結果はFutureオブジェクトのGetメソッドを使って取り出すことができる。RubyのFiberとの連携機能は特に用意していないので、必要であれば適当にラッパーを書いていただく感じだ。

require 'tkrzw'

# データベースを開く
dbm = Tkrzw::DBM.new
dbm.open("casket.tkh", true, truncate: true, num_buckets: 100)

# 4つのワーカスレッドを内部に持つ非同期APIアダプタを準備する
async = Tkrzw::AsyncDBM.new(dbm, 4)

# Setメソッドを非同期に実行する
future = async.set("hello", "world")
# ビジーループを回しつつ、フォアグラウンドで待機する
until future.wait(0)
  puts("Setting a record")
end
#  Setメソッドの結果を取得する
status = future.get
if status != Tkrzw::Status::SUCCESS
  puts("ERROR: " + status.to_s)
end

# Getメソッドを非同期に実行する
future = async.get("hello")
# フォアグラウンドで何かする。
puts("Getting a record")
# 処理待ちと結果の取得を一気に行う
status, value = future.get()
if status == Tkrzw::Status::SUCCESS
  puts("VALUE: " + value)
end

# 非同期APIアダプタを破棄する
async.destruct

# データベースを閉じる
dbm.close


PythonRubyの標準の処理系は、ネイティブスレッドを使って並列処理ができるようになっているが、グローバルインタープリタロック(GIL)によって処理系が保護されているため、PyhonやRubyのコードの処理はシングルスレッドでしか行われない。つまり、それらの言語でスレッドを使っても、同時に実行されるスレッドは一つなので、スループットは上がらない。つまり、並行処理(複数のタスクの開始と終了がオーバーラップし得る)ではあるが並列処理(複数の処理が同時に実行される)ではない。

PythonRubyでも、ファイルI/OやネットワークI/Oなどの、ネイティブスレッドがブロックしそうなネイティブの処理は、グローバルインタープリタロックをアンロックした状態で実行される。よって、ファイルI/Oの完了を待っている間にPython/RubyコードでCPU集中の処理を進めるといったシナリオでは、マルチスレッドによるスループットの向上が図れることになる。しかし、マルチスレッドを使うとスレッド間のコンテキストスイッチによるオーバーヘッドがかかるため、実際にスループットが上がるには、バックグラウンドとフォアグラウンドの処理の双方が相応に重いものであることが前提となる。しかし、PythonRubyのスレッド切り替えのオーバーヘッドに比べると、Tkrzwの単一のクエリは早すぎるのだ。

既存のテストケースは同期APIを前提に書かれている。各スレッドは、全体のイテレーション数をスレッド数で割った回数だけループを実行する。各ループの中ではSetかGetのクエリを1回発行して、即座に結果を待つ。よって、同期APIを使おうが非同期APIを使おうが、単一スレッドの視点では並列化はなされない。複数スレッドを使えば並行実行がなさるが、並列性は上がらない。GIL外でネイティブのAPIを呼ぶようにすれば並列性は上がるはずだが、その利得もスレッド切り替えのオーバーヘッドで相殺されてしまう。

以下の結果は、Pythonの既存のテストケースで、1000万レコードのSetとGetを行ったものだ。GIL外での非同期実行は、クエリの発行は一瞬で終わるのでGIL内で行うが、クエリの待ち合わせはブロックし得るのでGIL外で行っている。非同期APIを使う場合、ワーカスレッドの数を1つと2つで分けて測定した。

GIL内Set GIL内Get GIL外Set GIL外Get
スレッド1:同期 770,865 772,427 734,017 753,586
スレッド2:同期 743,112 205,057 357,597 156,146
スレッド4:同期 136,629 203,444 132,569 140,473
スレッド1:非同期1 123,637 117,478 123,930 115,490
スレッド2:非同期1 119,862 67,287 118,443 70,526
スレッド4:非同期1 67,738 66,724 58,094 59,171
スレッド1:非同期2 110,916 105,851 113,276 107,871
スレッド2:非同期2 110,717 65,119 113,490 66,882
スレッド4:非同期2 63,722 65,350 62,485 67,520

予想に違わず、非同期APIを使ってもスループットは上がらないどころか、落ちる。クエリを発行してすぐに待ち合わせをする使い方は非同期APIが意図するものではなく、単にオーバーヘッドを増やすだけということだ。バックグラウンドで実行しているTkrzwの処理の方が早いので、ワーカスレッドを増やす意味もない。また、GIL内だろうが、GIL外だろうが、スレッドを使うと遅くなる。つまるところ、スループットの視点では、シングルスレッドと同期APIで、全てを直列に行うのが最善ということが改めて確かめられた。


いやいや。まてまて。遅いとけなすばかりじゃ、何のためにコルーチンや非同期APIを実装したのか分からなくなってしまう。わざわざ複雑化させてまでコルーチンや非同期APIを採用するシステムが多いのは、実用上の利点があるからに他ならない。もう少し実際のユースケースに近づけた実験をして、コルーチンと非同期APIの組み合わせの有用性を示したいところだ。

非同期APIスループットが上がる典型的なユースケースは、時間のかかるI/O処理が多重化している場合である。Webのクローラとかがまさにそれだ。それに似たような負荷を掛けるテストを考案した。1000個のコルーチンを同時起動して、個々のコルーチンは100個のレコードのSetもしくはGetを行う。そのイテレーションを100回行う。結果として、合計1000万回のSetまたはGetが行われる。同期APIでは個々のコルーチンは直列にクエリを処理するが、非同期APIでは100個のFutureを作ってからその各々を待ち合わせるので、結果としてネイティブのワーカスレッドが並列に処理を行ってくれる。さらに、敢えてハッシュ表のバケットを少なくしたり、ファイルクラスをメモリマップI/Oから通常I/Oに変更したりして、データベースの処理に時間がかかるようにした。

import asyncio
import time
import tkrzw

NUM_ITERS = 100
NUM_COROUTINES = 1000
NUM_REPEATS = 100

# 同期APIのSetを行うコルーチン
async def set_sync(dbm, cor_id, iter_count):
  for i in range(0, NUM_REPEATS):
    key = str(cor_id * NUM_COROUTINES * NUM_REPEATS + iter_count * NUM_REPEATS + i)
    dbm.Set(key, key).OrDie()

# 非同期APIのSetを行うコルーチン
async def set_async(adbm, cor_id, iter_count):
  futures = []
  for i in range(0, NUM_REPEATS):
    key = str(cor_id * NUM_COROUTINES * NUM_REPEATS + iter_count * NUM_REPEATS + i)
    futures.append(adbm.Set(key, key))
  for future in futures:
    future.Get().OrDie()

# 同期APIのGetを行うコルーチン
async def get_sync(dbm, cor_id, iter_count):
  for i in range(0, NUM_REPEATS):
    key = str(cor_id * NUM_COROUTINES * NUM_REPEATS + iter_count * NUM_REPEATS + i)
    status = tkrzw.Status()
    dbm.Get(key, status)
    status.OrDie()

# 非同期APIのGetを行うコルーチン
async def get_async(adbm, cor_id, iter_count):
  futures = []
  for i in range(0, NUM_REPEATS):
    key = str(cor_id * NUM_COROUTINES * NUM_REPEATS + iter_count * NUM_REPEATS + i)
    futures.append(adbm.Get(key))
  for future in futures:
    status, value = future.Get()
    status.OrDie()

# メインコルーチン
async def main():
  dbm = tkrzw.DBM()
  num_buckets = NUM_ITERS * NUM_COROUTINES * NUM_REPEATS / 2
  dbm.Open("casket.tkh", True, concurrent=True, truncate=True,
       num_buckets=num_buckets, file="pos-para")
  adbm = tkrzw.AsyncDBM(dbm, 4)
  def make_set_sync(cor_id, iter_count):
    return set_sync(dbm, cor_id, iter_count)
  def make_set_async(cor_id, iter_count):
    return set_async(adbm, cor_id, iter_count)
  def make_get_sync(cor_id, iter_count):
    return get_sync(dbm, cor_id, iter_count)
  def make_get_async(cor_id, iter_count):
    return get_async(adbm, cor_id, iter_count)
  confs = [
    {"label": "SET SYNC", "op": make_set_sync},
    {"label": "SET ASYNC", "op": make_set_async},
    {"label": "GET SYNC", "op": make_get_sync},
    {"label": "GET ASYNC", "op": make_get_async},
  ]
  for conf in confs:
    start_time = time.time()  
    for iter_count in range(0, NUM_ITERS):
      coroutines = []
      for cor_id in range(0, NUM_COROUTINES):
        coroutines.append(conf["op"](cor_id, iter_count))
      for coroutine in coroutines:
        await coroutine
    end_time = time.time()
    elapsed = end_time - start_time
    print("{:10s}: {:8.0f} QPS".format(
      conf["label"], (NUM_ITERS * NUM_COROUTINES * NUM_REPEATS) / elapsed))
  adbm.Destruct()
  dbm.Close()

asyncio.run(main())

結果は以下のようになる。

Set Get
同期API 259,672 QPS 356,607 QPS
非同期API 435,695 QPS 455,660 QPS

ついに非同期が勝った! つまるところ、遅いデータベース処理を多重化して呼び出すような場合には、やはり非同期処理に利点があると言えそうだ。データベースの規模が大きくなればこの条件に当てはまる可能性が高くなる。


まとめ。TkrzwのPythonRubyインターフェイスが非同期APIサポートした。普通に使うと非同期APIは同期APIよりも遅いのだが、コルーチンと組み合わせるなどして多重化すれば、全体のスループットが上げられることもある。

実際のところ、データベース処理のスループットを上げるために非同期APIを導入するというよりは、対話型のアプリケーションやネットワークサービスなどで、スレッドのブロックを緩和してレイテンシを安定させるために非同期APIを使うことが多いだろう。今回の実験により、許容可能なオーバーヘッドで非同期APIが利用できることが確かめられた。それで十分だ。