今回の設計の要点の一つは、並列性である。ハッシュテーブルを保護するにあたって利用するスロットロック法について考察してみたい。
ハッシュテーブルの個々のバケットは、独立した排他制御で保護するのが望ましい。実際にはバケット数の分だけmutexを作ることはできないので、256個とかの固定数のmutexの配列を用意して、バケット数とスロット数の剰余でmutexを割り当てるスロットロック法を用いる。そして、個々の排他制御にはshared_mutexすなわちreader-writerロックを用いる。更新系のスレッドは同時に1スレッドしか該当バケットにアクセスできないが、参照系のスレッドは何スレッドでも同時にアクセスにできる。異なるバケット同士なら更新系であっても同時にアクセスできるので、衝突確率は1/256以下ということになる。
今回の実装では、スレッド並列性をとことんまで追求する。データベースを再構築して記憶領域を縮小する際についでに行われるリハッシュ操作ですら、通常のアクセス操作をブロックせずに並列に行いたい。再構築の作業中は元のデータベースと新しいデータベースの両方に更新を適用することで並列制を確保できるが、問題は、新しいデータベースを元のデータベースと置き換える際に発生する。バケットをロックしようとして待っているスレッドがいる状態で、全体のバケット数を変えなければいけないからだ。バケット数を変えればロックすべきスロットのインデックスが変わってしまうので、待っているスレッドにロックの再取得を促さねばならない。
ハッシュ関数の議論でハッシュ関数の計算にバケット数を組み込むことになったが、そのバケット数はリハッシュ操作で変わってしまう。スロットの計算法もリハッシュ操作で変わってしまう。となると、ハッシュ値の計算とバケットインデックスの取得とスロットロックの確保を一度に行う機構が必要になる。それが今回「ハッシュロック」と呼ぶクラスである。以下のメソッドが肝である。
class HashMutex { public: HashMutex(int64_t num_buckets, int32_t num_slots); int64_t LockOne(std::string_view data); void UnlockOne(int64_t bucket_index); void LockAll(); void UnlockAll(); void Rehash(int64_t num_buckets); };
LockOneに任意の文字列を与えると、内部でハッシュ値を計算し、現在のバケット数に応じたスロットをロックした上で、バケットインデックスを返す。バケットの操作が終わったらそのインデックスを指定してアンロックする。LockAllは全てのバケットをロックする。全てのロックを保持しているスレッドは、データベースファイル全体に任意の更新操作を行うことができる。それを利用して再構築時のファイルの挿げ替えを行う。さらに、LockAll後のスレッドは、Rehashを呼んでバケット数を変更することも許される。それが終わったらUnlockAllでバケット全体をアンロックする。
LockOneの実装が面白い。ロックをする前にバケット数を調べておき、ロックを取得した後にもバケット数を調べて、それがもし違っていれば、ロックの再取得を自主的に行う。もし一貫したロックが獲得できていれば、ロック成功とみなして抜ける。Compare-and-Swapをロックに応用した感じだ。UnlockOneは単に指定されたバケットに対応するスロットをアンロックするだけで良い。
int64_t HashMutex::LockOne(std::string_view data); while (true) { const int64_t old_num_buckets = num_buckets_.load(); const int64_t bucket_index = PrimaryHash(data, old_num_buckets); const int32_t slot_index = bucket_index % num_slots_; slots_[slot_index].lock_shared(); if (num_buckets_.load() == old_num_buckets) { return bucket_index; } slots_[slot_index].unlock_shared(); } return -1; } void HashMutex::UnlockOne(int64_t bucket_index) { const int32_t slot_index = bucket_index % num_slots_; slots_[slot_index].unlock(); }
LockOneにはバケットインデックスを受け取るバージョンも用意する。これは各バケットを順に調べて全てのレコードを辿るイテレータで用いる。イテレーションの途中でリハッシュが起きると全体の整合性(各レコードを1回ずつ読む)が保証できないので、その際には失敗を報告する必要がある。つまりロックに失敗できることが必要だ。
bool HashMutex::LockOne(int64_t bucket_index) { const int64_t old_num_buckets = num_buckets_.load(); if (bucket_index >= old_num_buckets) { return false; } const int32_t slot_index = bucket_index % num_slots_; slots_[slot_index].lock(); if (num_buckets_.load() == old_num_buckets) { return true; } slots_[slot_index].unlock(); return false; }
LockAllは全てのバケットをロックするのだが、LockOneでバケットを確保しているスレッドが一つもいなくなるまで待つ必要がある。それには0からスロット数の上限まで順番にロックしていけば良い。UnlockAllでは逆にスロット数の上限から0までの順でアンロックしていくことにより、デッドロックが回避できる。この順序はめちゃ重要。
void HashMutex::LockAll() { for (int32_t i = 0; i < num_slots_; i++) { slots_[i].lock(); } } void HashMutex::UnlockAll() { for (int32_t i = num_slots_ - 1; i >= 0; i--) { slots_[i].unlock(); } }
Rehashでは、単にバケット数を変更すれば良い。ちなみにバケット数はアトミック整数(std::atomic_int64_t)として表現しているので、LockOneでロックする前に参照されても安全だ。RehashはLockAllしたスレッドにしか呼ばれない前提なので、同時に書き換えられることはない。
void HashMutex::Rehash(int64_t num_buckets) { num_buckets_.store(num_buckets); }
LockOneとUnlockOneには占有ロックでなく共有ロックを使うバージョンであるLockOneSharedとUnlockOneSharedも用意する。読み込みだけのスレッドはそれらを用いる。同様に、LockAllとUnlockAllの共有ロック版であるLockAllSharedとUnlockSharedも用意する。データベース再構築の前準備である読み取り操作にはそれらを用いることになるだろう。
いちおう、こんな感じのテストで動作確認をしている。これがきちんと動くなら、リハッシュしまくっても大丈夫。多い日も安心。
TEST(ThreadUtilTest, HashMutex) { constexpr int32_t num_threads = 5; constexpr int32_t num_iterations = 20000; constexpr int32_t num_slots = 10; constexpr int32_t max_buckets = 100; tkrzw::HashMutex mutex(num_slots, max_buckets); std::vector<uint8_t> buckets(max_buckets, false); auto func = [&](int32_t id) { std::mt19937 mt(id); std::uniform_int_distribution<int32_t> rehash_dist(0, num_iterations / 10 - 1); std::uniform_int_distribution<int32_t> record_dist(0, tkrzw::INT32MAX); std::uniform_int_distribution<int32_t> num_buckets_dist(1, max_buckets); for (int32_t i = 0; i < num_iterations; i++) { if (rehash_dist(mt) == 0) { mutex.LockAll(); mutex.Rehash(num_buckets_dist(mt)); mutex.UnlockAll(); } else { const std::string& key = ToString(record_dist(mt)); const int64_t bucket_index = mutex.LockOne(key); EXPECT_FALSE(values[bucket_index]); buckets[bucket_index] = true; std::this_thread::yield(); EXPECT_TRUE(values[bucket_index]); buckets[bucket_index] = false; mutex.UnlockOne(bucket_index); } } }; std::vector<std::thread> threads; for (int32_t i = 0; i < num_threads; i++) { threads.emplace_back(std::thread(func, i)); } for (auto& thread : threads) { thread.join(); } }
テーブルロックをかけるなら一行で済む話が、スロットロックにするだけで鬼のように複雑化するのが辛い。だからこそ面白いとも言えるが。この一手間をかけると、テーブルロックをかけずにデータベースの再構築ができる前提が整う。今回の実装では追記型の更新モードの導入が目玉の一つなのだが、追記型だとひたすらファイルサイズが増大していくので、定期的にデータベースの再構築をすることが求められる。しかし、オンラインサービスのバックエンドとしてDBMを用いることを想定すると、再構築中にもスレッドがブロックすることは避けたい。よって、このハッシュロックの実装はかなり重要なのだ。