豪鬼メモ

一瞬千撃

ダイレクトI/O用のキャッシュ実装

ハードウェアと直接お話するダイレクトI/Oが遅いのは仕方ないことだが、アプリケーション層で小さなキャッシュ機構を使うと大分マシになるという話。


ファイルシステムのキャッシュを回避して、システムのメモリ空間を消費せずに入出力を行うのがダイレクトI/Oである。前回も述べたが、ダイレクトI/Oは宿命的に遅い。遅いながらも、アクセスパターンに局所性がまったくない場合は、システム全体にかける負荷を軽減できるので、有用である。今回は、前回述べたダイレクトI/Oを隠蔽するファイルクラスにキャシュ機構を導入して高速化を図る。

システム層のキャッシュを回避しておきながら自前のキャッシュを導入して高速化を唄うようなマッチポンプ行為に意味があるのか。微妙なところだが、特定のケースでは、ある。アクセスパターンに局所性がない場合にダイレクトI/Oを使うと述べたが、局所性のないアクセスパターンと局所性のあるアクセスパターンが混在するユースケースもある。その際に、容量を限定した小さなキャッシュがあると、メモリ使用量やCPU負荷を限定しつつも、局所性のあるアクセスパターンだけを高速化することができる。

典型的な例は、二分探索である。最初のステップでは検索空間のサイズの半分もの領域をジャンプしながら検索を行うが、次のステップでは、ジャンプする幅は半分になる。ステップ数が増えるにつれて、ジャンプする幅は小さくなり。最後の方は近接または隣接した領域をウロウロすることになる。つまり、検索操作の最初の方は局所性が低いが、最後の方は局所性が高いと言える。

Trkzwには、スキップリスト上で二分探索を実装したデータベースであるSkipDBMがある。設計時の記事でも述べたが、スキップ幅を要素のインデックスから決定的に計算するようにすると、平衡木として検索操作を行うことができる。そうすると、根に近い上位ノードはアクセスが集中するので、局所性が上がる。さらに、それら上位ノードはデータベースオブジェクトがキャッシュすることで高速化を図っている。検索操作のステップが上位ノードを過ぎると、キャッシュしきれない広い範囲の領域にランダムアクセスすることになる。これは、しょうがない。それをさらに過ぎると、二分探索の特性により、近接または隣接する狭い領域でのランダムアクセスを行う。検索操作の最後に訪れる狭い領域でのランダムアクセスは、ブロック単位で考えると、同じブロックか隣接するブロックに対する連続したアクセスになる。これは、ファイルオブジェクト内のキャッシュで高速化できる典型的なパターンだ。

SkipDBMの更新は、マージソートを使って行われるので、基本的にシーケンシャルアクセスのみになる。読み込みはファイルの先頭から行われ、書き込みはファイルの末尾に集中する。これも、ファイルオブジェクト内のキャッシュで高速化できる典型的なパターンだ。

ファイル上のハッシュテーブルの実装であるHashDBMの検索は、この方法では全く高速化しない。ハッシュテーブルのバケット表のアクセスはファイルの特定の領域(先頭)に偏るが、それに関してはデータベースオブジェクトがキャッシュするようになっている。レコードへのアクセスはほぼ1回のランダムアクセスで行うので、局所性が全く無い。既にHashDBMはダイレクトI/Oで最も効率的に検索できる構造であり、これ以上高速化できない。ただし、検索は高速化しないが、更新は高速化できる。新しいレコードや、追記モードで更新されたレコードは、常にファイルの末尾に追加されるので、局所性が高い。レコードのサイズがブロックサイズよりも小さい場合には、同じブロックを何度も更新することになる。これも、ファイルオブジェクト内のキャッシュで高速化できる典型的なパターンだ。

ファイル上のB+木の実装であるTreeDBMでは、検索も更新も、この方法では全く高速化しない。TreeDBMにおいてはB+木のノードはLRUキャッシュで管理し、ノードとファイルの同期はページ単位の入出力として、ブロックサイズにアラインメントされて行われる。既に効率的なキャッシュ機構がデータベースオブジェクトに内蔵されているとも言えるので、新たにファイルオブジェクトでキャッシュ機構を使う意味はない。

つまり、SkipDBMの検索と更新、そしてHashDBMの更新に関しては、ファイルオブジェクトにキャッシュ機構を設ければ、ダイレクトI/Oモードでのハードウェアとの間の入出力を抑制し、高速化を図ることができる。そもそもそれらの操作ではダイレクトI/Oを使わなきゃいいじゃないかという話もあるのだが、大規模なデータベースの運用を行いつつ、ファイルシステムのキャッシュをどうしても汚したくないというユースケースはあり得る。

そもそもキャッシュ機構はデータベース層で実装すべきであってファイル層で実装すべきじゃないんじゃないかと思ったが、そうするとファイル層の特性毎に、つまりメモリマップI/Oか通常I/OかブロックI/O(=ダイレクトI/O)かによって、最適化された入出力の挙動を書き分けなきゃならなくなるので、保守しきれなくなる恐れがある。よって、ブロックI/Oに特化したキャッシュ機構は、ブロックI/O用のファイルクラスに内蔵するのが現実的だ。


設計に入ろう。まず、PageCacheというデータ構造を考える。これは、ファイルシステムのページキャッシュ機構に似た機能を提供するが、より小規模のデータの管理を想定する。上述した一時的な局所性で扱うブロックの数はせいぜい10個かというところであり、マルチスレッドで同時にアクセスすることを考えても、その何倍かで十分だろう。

class PageCache {
 public:
  typedef std::function<Status(int64_t off, void* buf, size_t size)> ReadType;
  typedef std::function<Status(int64_t off, const void* buf, size_t size)> WriteType;
  
  PageCache(int64_t page_size, int64_t capacity, ReadType read_func, WriteType write_func);
  
  Status Read(int64_t off, void* buf, size_t size);
  
  Status Write(int64_t off, const void* buf, size_t size);

  Status Flush();
};

コンストラクタでは、ページサイズと、キャシュの容量と、指定した領域を読み込むためのコールバックと、指定した領域に書き込むためのコールバックを受け取る。入出力の処理を担うコールバックを注入するdependency injectionを採用することで、ランダムアクセス可能ないかなるストレージにも対応できるようになる。

Readメソットでは、指定された領域のページがキャッシュにあればそのまま使い、そうでなければコールバックを呼んでキャッシュ上にページを構築した上で、ページ上のデータをコピーする。Writeメソッドでは、指定された領域のページがキャッシュにあればそのまま使い、そうでなければコールバックを呼んでキャッシュ上のページを構築した上で、ページ上のデータを更新する。ReadにおいてもWriteにおいても、キャッシュの容量が溢れた場合には、古いページを捨てるが、一度以上の更新が行われたダーティなページはコールバックを呼んで書き戻す。Flushは全てのダーティなページを書き戻す。

ReadやWriteでは、入出力の対象領域のオフセットやサイズをページサイズにアラインメントさせずに呼び出すことを許す。ページ分割は内部で隠蔽される。ページサイズは利用者が自由に決めて良いが、現実的にはブロックサイズ512の倍数にすることだろう。ReadやWriteの中で暗黙的にブロック単位に分割し、適当なタイミングでファイルに書き出してくれる。

各ページのバッファの構造は以下のようになる。ページの実体データであるバイト列は、ブロックサイズにアラインメントされたメモリ領域bufで保持する。そうすると下層のI/Oで再びアラインメントをする必要がないので、効率的だ。offはファイル内のオフセットで、sizeは有効な領域の長さだ。このキャッシュ機構はダイレクトI/O以外でも利用できるようにしたいが、その際にはファイルの末尾のブロックの有効データはブロックサイズと同じであるとは限らないため、有効データのサイズを保持する必要がある。dirtyは、そのページが読み込まれてから一度以上更新されたかどうかを示す。更新されていたなら、キャッシュが消去される前に書き戻さねばならない。

struct Page {
  char* buf;
  int64_t off;
  int64_t size;
  bool dirty;
}

キャッシュされたページは以下の構造で管理される。ページの集合は単なる連結リストとして保持する。ベクタでなく連結リストなのは、キャッシュをFIFO(作成順が古いものから消える)として扱うのに最適だからだ。setやmapでないので線形探索しかできないが、小規模のデータしか扱わないことを前提としているので、問題ない。

struct Slot {
  std::list<Page> pages;
  std::mutex mutex;
};

キャッシュオブジェクトは、上記のスロット構造を16個保持する。ページの先頭アドレスでスロットを決定し、ページオブジェクトの寿命管理や排他制御はスロット単位で行う。ページを探す、なければ作る、使う、書き戻す、破棄するという一連のライフサイクルはmutexで保護しなければならない。キャッシュオブジェクト全体で排他制御を行うとI/Oの並列性がなくなってしまうので嫌だ。かといってページ単位にmutexを割り当てるのはコストが高すぎる。よって、16個に分けたスロット単位で排他制御をすることで、現実的に十分な並列性をそこそのコストで実現するのだ。ちなみにB+木のキャッシュの管理でもまさに同じ方法を採用している。

ここまで設計が決まれば、実装は難しくない。Readはこんな感じで実装できる。もしロックをしないと、キャッシュを使っている最中に別のスレッドが古いキャッシュとして破棄してしまったり、連結リストの操作が破綻したり、いろいろ恐ろしいことが起こってしまう。

Status PageCache::Read(int64_t off, void* buf, size_t size) {
  // 読み込んだデータを書き込むアドレス
  char *wp = static_cast<char*>(buf);
  // オフセットの位置がブロック単位でない場合、直前のブロック境界から読む
  const int64_t off_rem = off % page_size_;
  if (off_rem != 0) {
    // ブロックの先頭位置を見る
    const int64_t page_off = off - off_rem;
    // ブロックに該当するスロットを取得
    const int32_t slot_index = (page_off / page_size_) % NUM_SLOTS;
    auto& slot = slots_[slot_index];
    // 読み込む長さを計算
    const int64_t data_size = std::min<int64_t>(size, page_size_ - off_rem);
    // スロットをロックして、該当のブロックのキャシュを取得する
    std::lock_guard<std::mutex> lock(slot.mutex);
    Page* page = nullptr;
    Status status = PreparePage(&slot, page_off, page_size_, &page);
    if (status != Status::SUCCESS) {
      return status;
    }
    // キャッシュ上のデータを呼び出し側のバッファにコピー
    std::memcpy(wp, page->buf + off_rem, data_size);
    wp += data_size;
    off += data_size;
    size -= data_size;
    // 古いキャッシュを破棄する
    status = ReduceCache(&slot);
    if (status != Status::SUCCESS) {
      return status;
    }    
  }
  // 先頭以外のブロックでも同じ事を行う
  while (size > 0) {
    const int32_t slot_index = (off / page_size_) % NUM_SLOTS;
    auto& slot = slots_[slot_index];
    const int64_t data_size = std::min<int64_t>(size, page_size_);
    std::lock_guard<std::mutex> lock(slot.mutex);
    Page* page = nullptr;
    Status status = PreparePage(&slot, off, data_size, &page);
    if (status != Status::SUCCESS) {
      return status;
    }
    std::memcpy(wp, page->buf, data_size);
    wp += data_size;
    off += data_size;
    size -= data_size;
    status = ReduceCache(&slot);
    if (status != Status::SUCCESS) {
      return status;
    }    
  }
  return Status(Status::SUCCESS);
}

Writeの実装もだいたい同じ感じだ。実際のコードを見ていただくほうがわかりやすいと思う。この実装はダイレクトI/O以外でも有用で、読み込みや書き込みのコストの高いいかなるシーケンスの管理にも使える。

テストコードから使い方を抜粋してみよう。ファイルでなくバッファの読み書きをするコールバックを渡すことで、テストも用意になる。DI万才。

TEST(FileUtilTest, PageCache) {
  constexpr int64_t file_size = 8192;
  char file_buffer[file_size];
  for (int32_t i = 0; i < file_size; i++) {
    file_buffer[i] = '0' + i % 10;
  }
  auto read_func = [&](int64_t off, void* buf, size_t size) {
                     if (size + off > file_size) {
                       return tkrzw::Status(tkrzw::Status::INFEASIBLE_ERROR);
                     }
                     std::memcpy(buf, file_buffer + off, size);
                     return tkrzw::Status(tkrzw::Status::SUCCESS);
                   };
  auto write_func = [&](int64_t off, const void* buf, size_t size) {
                      if (size + off > file_size) {
                        return tkrzw::Status(tkrzw::Status::INFEASIBLE_ERROR);
                      }
                      std::memcpy(file_buffer + off, buf, size);
                      return tkrzw::Status(tkrzw::Status::SUCCESS);
                    };
  tkrzw::PageCache cache(10, 2048, read_func, write_func);
  EXPECT_EQ(tkrzw::Status::SUCCESS, cache.Read(5, buf, 10));
  EXPECT_EQ("5678901234", std::string_view(buf, 10));
  EXPECT_EQ(tkrzw::Status::SUCCESS, cache.Read(0, buf, 20));
  EXPECT_EQ("01234567890123456789", std::string_view(buf, 20));
  cache.Clear();
  EXPECT_EQ(tkrzw::Status::SUCCESS, cache.Write(0, "ABCDE", 5));
  EXPECT_EQ(5, cache.GetRegionSize());
  EXPECT_EQ(tkrzw::Status::SUCCESS, cache.Read(0, buf, 10));
  EXPECT_EQ("ABCDE56789", std::string_view(buf, 10));
  EXPECT_EQ(5, cache.GetRegionSize());
  EXPECT_EQ(tkrzw::Status::SUCCESS, cache.Write(5, "FGHIJ", 5));
  EXPECT_EQ(tkrzw::Status::SUCCESS, cache.Read(0, buf, 10));
  EXPECT_EQ("ABCDEFGHIJ", std::string_view(buf, 10));
  EXPECT_EQ(10, cache.GetRegionSize());
  EXPECT_EQ(tkrzw::Status::SUCCESS, cache.Write(8, "XXXX", 4));
  EXPECT_EQ(tkrzw::Status::SUCCESS, cache.Write(18, "YYYY", 4));
  EXPECT_EQ(tkrzw::Status::SUCCESS, cache.Read(0, buf, 25));
  EXPECT_EQ("ABCDEFGHXXXX234567YYYY234", std::string_view(buf, 25));
  EXPECT_EQ("0123456789012345678901234", std::string_view(file_buffer, 25));
  EXPECT_EQ(tkrzw::Status::SUCCESS, cache.Flush());
  EXPECT_EQ("ABCDEFGHXXXX234567YYYY234", std::string_view(file_buffer, 25));
  EXPECT_EQ(22, cache.GetRegionSize());
  cache.Clear();
  EXPECT_EQ(0, cache.GetRegionSize());
}

実際にはマルチスレッドで負荷をかけるテストケースとかもあるんだけど、割愛。スレッド系のバグは後で出てくると本当に困るので、UIレベルで撲滅したいところ。

ここで取り上げた局所性は、DB層の1回の検索処理にかかる複数のファイルアクセスで起こる局所性だ。つまりその複数のアクセスは単一のスレッドが引き起こすのが普通だ。ということは、スレッドローカルなキャッシュ機構を提供すれば、排他制御なしで効率的に動作させることもできる。しかし、それは採用しなかった。更新処理を考えると、どうしても全スレッドでキャッシュを共有せねばならないのだ。読み込み専用モードに最適化した実装としてならスレッドローカルでもいいけども、複数の実装をメンテするほどの利点はないだろう。


キャッシュはデータの読み書きを遅延させる仕組みなので、それに伴う副作用もある。プロセス内のキャッシュの更新は、それを実際に書き込むまでは、プロセス外からは読み取れない。また、他のプロセスが同じファイルを更新したとしても、再読み込みをするまでは、プロセス内のキャッシュに反映されることはない。さらに言えば、実装の都合で、ファイルオブジェクト内でキャッシュを管理している場合、同じファイルを別のオブジェクトが開いた場合に、同様の非同期性問題が生じてしまう。

実際のところ、キャッシュを有効にしたファイルオブジェクトを注入したデータベースオブジェクトに再構築をかけたところ、データが壊れるバグが発生した。再構築中も通常の更新処理のスレッドをブロックしないように、再構築専用のデータベースオブジェクトを別のファイルオブジェクトを使って開くというトリッキーな実装になっていたのが問題となった。ただし、これを発見できたのはむしろ幸運だった。たとえmmapを使ったファイルオブジェクトでも非同期性の問題は起き得るので、同じファイルオブジェクトを使い回すべきだったのだ。

そのバグを直したところ、キャッシュを有効にしたダイレクトI/Oのデータベースが通常のI/Oモードのデータベースと全く同じように動作するようになった。Windows上でもLinux上と同じように動く。HashDBMもTreeDBMもSkipDBMも全てちゃんと動く。


さて、これをファイルクラスに組み込んで性能測定したいわけだが、やたら記事が長くなったので次回に持ち越そう。