Elasticsearch を最近メインで使うようになり強力なデータストアだなぁと実感する日々ですが、大量のドキュメントを検索し、ページング処理をしたい時に、ちょっと面倒くさいなと思ったので、ActiveRecord の find_each メソッドのような使い勝手を enum_for を使って再現してみました。

利用している Gem は elasticsearch-persistence です。

ちなみに、Rails で Elasticsearch を気軽に使うための公式ライブラリはいくつかあります。

  • elasticsearch-model : ActiveRecord のモデルとの統合をしてくれる
  • elasticsearch-rails : Rails インテグレーション系のサポート
  • elasticsearch-persistence : モデルやテーブル関係なく、Elasticsearch の永続層を提供してくれる

今回は、データは Elasticsearch のみにあるので、ActiveRecord のモデル統合や Rails インテグレーションは必要ないので elasticsearch-persistence のみを利用しました。

Elasticsearch のページング

Elasticsearch では 1 クエリあたり最大 10,000 件のドキュメントを返します。それ以降は、ページング処理が必要になります。ページングの方法はいくつかありますが、今回はシンプルに search_after を使います。もし検索時点でのデータ整合性を保ちたい場合は Point In Time API を利用するといいでしょう。

リポジトリクラスを作る

elasticsearch-persistence gem は version6 より前は ActiveRecord と Repository パターンをサポートしていたようですが、今は Repository パターンのみをサポート推奨しているようです。

確かに永続層を提供してくれる elasticsearch-persistence gem の場合、ActiveRecord と密になるよりは Repository パターンで切り出して、モデルは別クラスで作った方が使い勝手が良さそうです。

こんな感じで Repository クラスとモデルクラスを作ってあげるだけで、Elasticsearch のドキュメントを Artist クラスオブジェクトとして自動的に変換してくれます。便利ですね。Repository の詳しい扱い方については上記のブログ記事を参照してください。

class ArtistRepository
  include Elasticsearch::Persistence::Repository
  include Elasticsearch::Persistence::Repository::DSL
  
  klass Artist
end

class Artist
  attr_accessor :name

  def to_hash
    {
      id: id,
      name: @name
    }
  end
end

find_each 風にしてみる

前置きが長くなりましたが、ここからページング処理を find_each 風に扱えるようにします。いきなり実装コードです。

class ArtistRepository
  include Elasticsearch::Persistence::Repository
  include Elasticsearch::Persistence::Repository::DSL
  
  klass Artist
  
  BATCH_SIZE = 10000
  
  def find_each(query:)
    if block_given?
      find_in_batches(query: query) do |records|
        records.each { |record| yield record }
      end
    else
      enum_for(:find_each, query: query)
    end
  end
  
  def find_in_batches(query:)
    unless block_given?
      return to_enum(:find_in_batches, query: query)
    end

    search_after = {}

    # ページングでデータがなくなるまで取り続ける
    loop do
      params = {
        query: query,
        size: BATCH_SIZE,
        sort: [
          { name: 'asc' } # 名前順にソートしている
        ]
      }.merge(search_after)

      batch = search(params)

      yield batch

      break if batch.size < BATCH_SIZE

      # 今取得したドキュメントの最後の名前を search_after パラメータに指定する
      search_after = { search_after: [batch.last.email] }
    end
  end
end

ちょっと長くなりましたが、これでこのように使えます。

query = { bool: { must: { match: { foo: 'bar' } } } }
ArtistRepository.new.find_each(query: query) to |artist|
  puts artist.name
end

もし、10000 件以上のドキュメントが存在していても、自動で search_after パラメータを指定してページングをしてくれます。便利ですね。find_each 風のメソッドを作るのに yield / enum_for / enum_to を使いました。Ruby はこの手の処理を簡単に書けるので楽ですね。

Elasticsearch 関係なくもっとシンプルなサンプルコードはこちらです

class Batches
  BATCH_SIZE = 2
  
  def initialize
    @data = (1..10).to_a
  end
  
  def find_each
    if block_given?
      find_in_batches do |records|
        records.each { |record| yield record }
      end
    else
      enum_for(:find_each)
    end
  end
  
  def find_in_batches
    unless block_given?
      return to_enum(:find_in_batches)
    end

    @start = 0
    loop do
      batch = @data[@start, BATCH_SIZE]
      yield batch
      @start += BATCH_SIZE
      break if batch.size < BATCH_SIZE
    end
  end
end

# 直接イテレート
Batches.new.find_each do |record|
  puts record
end

# Enumerator オブジェクト受け取ってからイテレート
result = Batches.new.find_each
result.each do |record|
  puts record
end