sidekiq の scheduled jobの性能

sidekiq と scheduled job

ruby でよく使われるジョブキューにsidekiqというものがあります。 Rails などでは Active Job のバックエンドとして使うこともできます。

sidekiq は即時的に処理を行うだけではなく、Scheduled Jobという機能があり、例えば以下のようにすると1時間後にjobを実行することができます。

MyJob.perform_in(1.hour)

# active job
MyJob.set(wait: 1.hour).perform_later

また、即時で実行したい場合には以下のようにします

MyJob.perform_async

# active job 
MyJob.perform_later

sidekiq の queue の実装

sidekiqでは、queueにredisを使用しており、即時実行用の queue と scheduled job 用のqueueは異なった実装になっています。 それぞれについて、どのようにjobを溜めているのか、どのようにjobを取り出しているのかをみていきます。

即時実行

即時実行の場合、 redisのリストに

"queue:#{queue_name}"

というkeyでjobがpushされていきます。コードはこの辺です。

キューから取り出すときも単純で、単にリストからpopしているだけです。

scheduled job

scheduled job はソート済みセット型を使用しています。 n時間後に実行の場合、以下のようにtimeオブジェクトを浮動小数点数への変換を行い、変換した値をソート済みセット型のスコアとして使用しています。

# https://github.com/mperham/sidekiq/blob/2ed92600fa71a9c275189d01df369ad4f8b9ca32/lib/sidekiq/worker.rb#L55-L66

def perform_in(interval, *args)
  int = interval.to_f
    now = Time.now.to_f
    ts = (int < 1_000_000_000 ? now + int : int)

    payload = @opts.merge('class' => @klass, 'args' => args, 'at' => ts)
    # Optimization to enqueue something now that is scheduled to go out now or in the past
    payload.delete('at') if ts <= now
    @klass.client_push(payload)
  end
  alias_method :perform_at, :perform_in
end
# https://github.com/mperham/sidekiq/blob/2ed92600fa71a9c275189d01df369ad4f8b9ca32/lib/sidekiq/worker.rb#L136-L144
def client_push(item) # :nodoc:
  pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options['pool'] || Sidekiq.redis_pool
  # stringify
  item.keys.each do |key|
    item[key.to_s] = item.delete(key)
  end

  Sidekiq::Client.new(pool).push(item)
end
# https://github.com/mperham/sidekiq/blob/2ed92600fa71a9c275189d01df369ad4f8b9ca32/lib/sidekiq/client.rb#L69-L77
def push(item)
  normed = normalize_item(item)
  payload = process_single(item['class'], normed)

  if payload
    raw_push([payload])
    payload['jid']
  end
end
# https://github.com/mperham/sidekiq/blob/2ed92600fa71a9c275189d01df369ad4f8b9ca32/lib/sidekiq/client.rb#L181-L206
def raw_push(payloads)
  @redis_pool.with do |conn|
    conn.multi do
      atomic_push(conn, payloads)
    end
  end
  true
end

def atomic_push(conn, payloads)
  if payloads.first['at']
    conn.zadd('schedule', payloads.map do |hash|
      at = hash.delete('at').to_s
      [at, Sidekiq.dump_json(hash)]
    end)
  else
    q = payloads.first['queue']
    now = Time.now.to_f
    to_push = payloads.map do |entry|
      entry['enqueued_at'] = now
      Sidekiq.dump_json(entry)
    end
    conn.sadd('queues', q)
    conn.lpush("queue:#{q}", to_push)
  end
end

続いて、queueからjobを取り出す処理を見ていきます。

  • ソート時みセット型から、Time.now.to_f.to_s 以下のスコアのjobを取得
  • 取得したjobをソート済みセット型から削除を試みる
  • 削除が成功すると、取得したjobを即時実行のqueueにpushする

という処理を行なっています

# https://github.com/mperham/sidekiq/blob/2ed92600fa71a9c275189d01df369ad4f8b9ca32/lib/sidekiq/scheduled.rb#L11-L33

def enqueue_jobs(now=Time.now.to_f.to_s, sorted_sets=SETS)
  # A job's "score" in Redis is the time at which it should be processed.
  # Just check Redis for the set of jobs with a timestamp before now.
  Sidekiq.redis do |conn|
    sorted_sets.each do |sorted_set|
      # Get the next item in the queue if it's score (time to execute) is <= now.
      # We need to go through the list one at a time to reduce the risk of something
      # going wrong between the time jobs are popped from the scheduled queue and when
      # they are pushed onto a work queue and losing the jobs.
      while job = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do

        # Pop item off the queue and add it to the work queue. If the job can't be popped from
        # the queue, it's because another process already popped it so we can move on to the
        # next one.
        if conn.zrem(sorted_set, job)
          Sidekiq::Client.push(Sidekiq.load_json(job))
          Sidekiq::Logging.logger.debug { "enqueued #{sorted_set}: #{job}" }
        end
      end
    end
  end
end

queueの実装のまとめ

sidekiqでは、即時実行のqueueからjobをpopし、実行するやつはworker、scheduled job のqueue からjobを取得し即時実行のqueueにpushするやつはpooler と呼ばれておりそれぞれ別のスレッドで動いています。

ごちゃごちゃ書きましたが、文章だけだとわかりづらいので図にするとこんな感じです。

f:id:ogidow:20180625103131p:plain

scheduled job のパフォーマンス

ようやく本題です。 ある時点に多くの job を schedule すると、scheduled job の性能が悪くなります。 ここでいう性能が悪いというのは「n分後にscheduleしたが、実際に実行されるまでに時間がかかる」ということを意味しています。

処理量が増えるので、1つの poller あたりの処理量が頭打ちになり性能が悪くなるのは当たり前と思うかもしれません。 しかし、この問題は poller を増やしてもあまり改善しません。 「なぜpoller を増やしても改善しないのか、また即時実行の場合はworker を増やせば大量の処理を捌けるのか」ということを簡単な実験をしながら検証していきます。

即時実行の場合

上で説明した通り、即時実行の場合はredisのリスト型の queue にobが積まれており、worker が queue に積まれているjobを取り出し実行します。

簡易的に queue に 10万件の job が積まれていることを以下のように表現します。

require 'redis'
redis = Redis.new(host: '127.0.0.1', port: '6379')

100000.times do |i|
  redis.rpush('hoge', i)
end

そして、次のように 並列数(worker の数)を増やしつつ、job を 全て queue から取り出す速度を計測します。

require 'benchmark'
require 'redis'
require 'connection_pool'

number_of_worker = 5

Benchmark.realtime do
  Parallel.each((1..number_of_worker).to_a, in_processes: number_of_worker) do |i|
    redis_pool.with do |conn|
      while job = conn.rpop('hoge') do
      end
    end
  end
end

結果は以下のようになりました。

並列数 実行時間
1 21.21864500000038
3 19.56006800000023
5 15.638484000000062
7 13.080714000000171
10 11.504145999999764

並列数が増えるにしたがって、実行時間が短くなっていることがわかります。 このことから、即時実行でパフォーマンスに問題がある場合に、worker の数を増やすことは一定の効果があると考えられます。

scheduled job の場合

scheduled job の場合はソート済みセット型をqueueとして利用しているので、以下のように10万件の job が積まれていることを表現します。

require 'redis'
redis = Redis.new(host: '127.0.0.1', port: '6379')

100000.times do |i|
  redis.zadd('hoge', rand, i)
end

そして、同じように 並列数を増やしつつ、job を 全て queue から取り出す速度を計測します。

require 'benchmark'
require 'redis'
require 'connection_pool'

number_of_worker = 5

Benchmark.realtime do
  Parallel.each((1..number_of_worker).to_a, in_processes: number_of_worker) do |i|
    redis_pool.with do |conn|
      while job = conn.zrangebyscore('hoge', 0, 1, :limit => [0, 1]).first do
        conn.zrem('hoge', job)
      end
    end
  end
end

ポイントは即時実行の場合はqueueから job を取得する処理とqueueから削除する処理が同時にできるのに対し、 scheduled job では取得と削除を別々にやってます。

計測結果は以下のようになりました。

並列数 実行時間
1 46.90739100000064
3 82.63857199999984
5 125.35556799999995
7 111.53749299999981
10 150.0042739999999

並列数が増えるに連れて実行時間は長くなる傾向にあるようです。

scheduled job では並列で、queue から job の取得・削除を行うので以下のずのように競合が発生します。 競合が発生すると、片方の poller の処理が無駄になり効率が悪くなってしまいます。 並列数が増えると競合する確率が高くなり、処理の効率が落ち結果的に実行時間が長くなってしまうのかもしれません。

f:id:ogidow:20180626134443p:plain

実際にどのくらいの回数競合しているかを調べて見ます。

redis.del 'conflict'

Parallel.each((1..number_of_worker).to_a, in_processes: number_of_worker) do |i|
    redis_pool.with do |conn|
      while job = conn.zrangebyscore('hoge', 0, 1, :limit => [0, 1]).first do
        unless conn.zrem('hoge', job)
          conn.rpush('conflict', rand)
        end
      end
    end
  end

redis.llen 'conflict'
並列数 競合数
1 0
3 90490
5 164632
7 219304
10 308126

並列数が増えるに連れて、競合数も増えていることがわかります。 今回の実験では job数が10万件だったので、単純に10万回処理すれば良いはずですが、並列数10の場合は 308126回も処理が無駄になっています。

もちろん、実際は各poller が queue をポーリングするタイミングはもう少しばらけますが、poller が増えれば競合する確率も高くなるということには変わりありません。

同じ時間帯に大量の job を scheduling していて、実際に実行される時間までラグがあるという場合には、処理を1つのjobにできるだけまとめて、jobの数自体を減らすなどの工夫が必要かもしれません*1

*1:処理をまとめるかつ冪等なjobを書かないといけないという難しさはありますが...