sidekiq の scheduled jobの性能
sidekiq と scheduled job
ruby でよく使われるジョブキューにsidekiqというものがあります。 Rails などでは Active Job のバックエンドとして使うこともできます。
sidekiq は即時的に処理を行うだけではなく、Scheduled Jobという機能があり、例えば以下のようにすると1時間後にjobを実行することができます。
MyJob.perform_in(1.hour) # active job MyJob.set(wait: 1.hour).perform_later
また、即時で実行したい場合には以下のようにします
MyJob.perform_async # active job MyJob.perform_later
sidekiq の queue の実装
sidekiqでは、queueにredisを使用しており、即時実行用の queue と scheduled job 用のqueueは異なった実装になっています。 それぞれについて、どのようにjobを溜めているのか、どのようにjobを取り出しているのかをみていきます。
即時実行
即時実行の場合、 redisのリストに
"queue:#{queue_name}"
というkeyでjobがpushされていきます。コードはこの辺です。
キューから取り出すときも単純で、単にリストからpopしているだけです。
scheduled job
scheduled job はソート済みセット型を使用しています。 n時間後に実行の場合、以下のようにtimeオブジェクトを浮動小数点数への変換を行い、変換した値をソート済みセット型のスコアとして使用しています。
# https://github.com/mperham/sidekiq/blob/2ed92600fa71a9c275189d01df369ad4f8b9ca32/lib/sidekiq/worker.rb#L55-L66 def perform_in(interval, *args) int = interval.to_f now = Time.now.to_f ts = (int < 1_000_000_000 ? now + int : int) payload = @opts.merge('class' => @klass, 'args' => args, 'at' => ts) # Optimization to enqueue something now that is scheduled to go out now or in the past payload.delete('at') if ts <= now @klass.client_push(payload) end alias_method :perform_at, :perform_in end
# https://github.com/mperham/sidekiq/blob/2ed92600fa71a9c275189d01df369ad4f8b9ca32/lib/sidekiq/worker.rb#L136-L144 def client_push(item) # :nodoc: pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options['pool'] || Sidekiq.redis_pool # stringify item.keys.each do |key| item[key.to_s] = item.delete(key) end Sidekiq::Client.new(pool).push(item) end
# https://github.com/mperham/sidekiq/blob/2ed92600fa71a9c275189d01df369ad4f8b9ca32/lib/sidekiq/client.rb#L69-L77 def push(item) normed = normalize_item(item) payload = process_single(item['class'], normed) if payload raw_push([payload]) payload['jid'] end end
# https://github.com/mperham/sidekiq/blob/2ed92600fa71a9c275189d01df369ad4f8b9ca32/lib/sidekiq/client.rb#L181-L206 def raw_push(payloads) @redis_pool.with do |conn| conn.multi do atomic_push(conn, payloads) end end true end def atomic_push(conn, payloads) if payloads.first['at'] conn.zadd('schedule', payloads.map do |hash| at = hash.delete('at').to_s [at, Sidekiq.dump_json(hash)] end) else q = payloads.first['queue'] now = Time.now.to_f to_push = payloads.map do |entry| entry['enqueued_at'] = now Sidekiq.dump_json(entry) end conn.sadd('queues', q) conn.lpush("queue:#{q}", to_push) end end
続いて、queueからjobを取り出す処理を見ていきます。
- ソート時みセット型から、
Time.now.to_f.to_s
以下のスコアのjobを取得 - 取得したjobをソート済みセット型から削除を試みる
- 削除が成功すると、取得したjobを即時実行のqueueにpushする
という処理を行なっています
# https://github.com/mperham/sidekiq/blob/2ed92600fa71a9c275189d01df369ad4f8b9ca32/lib/sidekiq/scheduled.rb#L11-L33 def enqueue_jobs(now=Time.now.to_f.to_s, sorted_sets=SETS) # A job's "score" in Redis is the time at which it should be processed. # Just check Redis for the set of jobs with a timestamp before now. Sidekiq.redis do |conn| sorted_sets.each do |sorted_set| # Get the next item in the queue if it's score (time to execute) is <= now. # We need to go through the list one at a time to reduce the risk of something # going wrong between the time jobs are popped from the scheduled queue and when # they are pushed onto a work queue and losing the jobs. while job = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do # Pop item off the queue and add it to the work queue. If the job can't be popped from # the queue, it's because another process already popped it so we can move on to the # next one. if conn.zrem(sorted_set, job) Sidekiq::Client.push(Sidekiq.load_json(job)) Sidekiq::Logging.logger.debug { "enqueued #{sorted_set}: #{job}" } end end end end end
queueの実装のまとめ
sidekiqでは、即時実行のqueueからjobをpopし、実行するやつはworker、scheduled job のqueue からjobを取得し即時実行のqueueにpushするやつはpooler と呼ばれておりそれぞれ別のスレッドで動いています。
ごちゃごちゃ書きましたが、文章だけだとわかりづらいので図にするとこんな感じです。
scheduled job のパフォーマンス
ようやく本題です。 ある時点に多くの job を schedule すると、scheduled job の性能が悪くなります。 ここでいう性能が悪いというのは「n分後にscheduleしたが、実際に実行されるまでに時間がかかる」ということを意味しています。
処理量が増えるので、1つの poller あたりの処理量が頭打ちになり性能が悪くなるのは当たり前と思うかもしれません。 しかし、この問題は poller を増やしてもあまり改善しません。 「なぜpoller を増やしても改善しないのか、また即時実行の場合はworker を増やせば大量の処理を捌けるのか」ということを簡単な実験をしながら検証していきます。
即時実行の場合
上で説明した通り、即時実行の場合はredisのリスト型の queue にobが積まれており、worker が queue に積まれているjobを取り出し実行します。
簡易的に queue に 10万件の job が積まれていることを以下のように表現します。
require 'redis' redis = Redis.new(host: '127.0.0.1', port: '6379') 100000.times do |i| redis.rpush('hoge', i) end
そして、次のように 並列数(worker の数)を増やしつつ、job を 全て queue から取り出す速度を計測します。
require 'benchmark' require 'redis' require 'connection_pool' number_of_worker = 5 Benchmark.realtime do Parallel.each((1..number_of_worker).to_a, in_processes: number_of_worker) do |i| redis_pool.with do |conn| while job = conn.rpop('hoge') do end end end end
結果は以下のようになりました。
並列数 | 実行時間 |
---|---|
1 | 21.21864500000038 |
3 | 19.56006800000023 |
5 | 15.638484000000062 |
7 | 13.080714000000171 |
10 | 11.504145999999764 |
並列数が増えるにしたがって、実行時間が短くなっていることがわかります。 このことから、即時実行でパフォーマンスに問題がある場合に、worker の数を増やすことは一定の効果があると考えられます。
scheduled job の場合
scheduled job の場合はソート済みセット型をqueueとして利用しているので、以下のように10万件の job が積まれていることを表現します。
require 'redis' redis = Redis.new(host: '127.0.0.1', port: '6379') 100000.times do |i| redis.zadd('hoge', rand, i) end
そして、同じように 並列数を増やしつつ、job を 全て queue から取り出す速度を計測します。
require 'benchmark' require 'redis' require 'connection_pool' number_of_worker = 5 Benchmark.realtime do Parallel.each((1..number_of_worker).to_a, in_processes: number_of_worker) do |i| redis_pool.with do |conn| while job = conn.zrangebyscore('hoge', 0, 1, :limit => [0, 1]).first do conn.zrem('hoge', job) end end end end
ポイントは即時実行の場合はqueueから job を取得する処理とqueueから削除する処理が同時にできるのに対し、 scheduled job では取得と削除を別々にやってます。
計測結果は以下のようになりました。
並列数 | 実行時間 |
---|---|
1 | 46.90739100000064 |
3 | 82.63857199999984 |
5 | 125.35556799999995 |
7 | 111.53749299999981 |
10 | 150.0042739999999 |
並列数が増えるに連れて実行時間は長くなる傾向にあるようです。
scheduled job では並列で、queue から job の取得・削除を行うので以下のずのように競合が発生します。 競合が発生すると、片方の poller の処理が無駄になり効率が悪くなってしまいます。 並列数が増えると競合する確率が高くなり、処理の効率が落ち結果的に実行時間が長くなってしまうのかもしれません。
実際にどのくらいの回数競合しているかを調べて見ます。
redis.del 'conflict' Parallel.each((1..number_of_worker).to_a, in_processes: number_of_worker) do |i| redis_pool.with do |conn| while job = conn.zrangebyscore('hoge', 0, 1, :limit => [0, 1]).first do unless conn.zrem('hoge', job) conn.rpush('conflict', rand) end end end end redis.llen 'conflict'
並列数 | 競合数 |
---|---|
1 | 0 |
3 | 90490 |
5 | 164632 |
7 | 219304 |
10 | 308126 |
並列数が増えるに連れて、競合数も増えていることがわかります。 今回の実験では job数が10万件だったので、単純に10万回処理すれば良いはずですが、並列数10の場合は 308126回も処理が無駄になっています。
もちろん、実際は各poller が queue をポーリングするタイミングはもう少しばらけますが、poller が増えれば競合する確率も高くなるということには変わりありません。
同じ時間帯に大量の job を scheduling していて、実際に実行される時間までラグがあるという場合には、処理を1つのjobにできるだけまとめて、jobの数自体を減らすなどの工夫が必要かもしれません*1。
*1:処理をまとめるかつ冪等なjobを書かないといけないという難しさはありますが...