skaffold で image 作成時に volumes で クレデンシャルを渡そうとしてハマった
Docker image を作成する時プライベートなリポジトリをpullする、s3からなんか引っ張ってくる、などなど Dockerfile 内で認証情報などのクレデンシャルを利用した処理を行いたいことが多々あると思います。
僕はKubernetesのアプリケーションを開発する際に skaffoldというツールを利用しています。
image のビルドもskaffold と Kubernetes クラスタ内で image をビルドするため kaniko というツールを組み合わせて行なっています。
skaffold + kaniko で前述のようにクレデンシャルを利用する処理を含んだ Dockerfile からイメージを作成しようとしてハマった事を記録します。
今回は GitHub Packages に公開された npm package を取得する例を元に説明します。
npm package を取得するための認証は以下の内容を `.npmrc` ファイルとして保存する事で可能です。
//npm.pkg.github.com/:_authToken=TOKEN
kaniko を利用してimageをビルドする場合、Kubernetes クラスタ上に kaniko pod が作成され pod上でビルドを行います。
ということで kaniko pod 作成時に 上記の .npmrc を 渡してあげると良さそうです。
僕は .npmrc を secret に保存し Volumes でマウントさせる方法を試しました。
skaffold を利用する場合、諸々の設定を skaffold.yaml に書きます。
skaffold.yaml のリファレンスは以下です。
リファレンスを参考に以下のような skaffold.yaml を用意しました。
apiVersion: skaffold/v2beta10 kind: Config build: artifacts: - image: gcr.io/k8s-skaffold/example context: . kaniko: volumeMounts: - name: npmrc mountPath: "/root/" cluster: pullSecretName: kaniko-secret volumes: - name: npmrc secret: secretName: npmrc
npmrc という名前のた secret を kaniko pod の /root/ にマウントしています。
secret は以下のような感じなので kaniko-pod に /root/.npmrc がマウントされます。
Name: npmrc Namespace: default Labels: <none> Annotations: <none> Type: Opaque続きを読む
ngx_http_userid_module の uid_got/uid_set と cookie に書き込まれる uid について
ngx_http_userid_module を使うと nginx でクライアントを識別するための 識別子 を cookie に焼くことができる。
また、cookie に書き込んだあたいは $uid_got/$uid_set の組み込み変数で参照可能でログなどに記録することができる。
例えば、以下のような conf で nginx を起動し、適当にアクセスした場合のログを見てみる
user nginx; worker_processes 1; error_log /var/log/nginx/error.log warn; pid /var/run/nginx.pid; events { worker_connections 1024; } http { include /etc/nginx/mime.types; default_type application/octet-stream; userid on; userid_name uid; userid_path /; userid_expires max; log_format main "time:$time_local\t" 'req:$request\t' 'cookie:$http_cookie\t' 'set_cookie:$sent_http_set_cookie\t' 'uid_got:$uid_got\t' 'uid_set:$uid_set'; access_log /var/log/nginx/access.log main; sendfile on; include /etc/nginx/conf.d/*.conf; }
ログはこちらになる
time:21/Aug/2020:03:16:37 +0000 req:GET / HTTP/1.1 cookie:- set_cookie:uid=rBEAAl8/PJUlwAAGAwMEAg==; expires=Thu, 31-Dec-37 23:55:55 GMT; path=/ uid_got:- uid_set:uid=020011AC953C3F5F0600C02502040303
set_cookie と uid_set(初回はuid_set に値が入って次回以降はuid_gotに値が入る)を見る
set_cookie:uid=rBEAAl8/PJUlwAAGAwMEAg==; expires=Thu, 31-Dec-37 23:55:55 GMT; path=/ uid_set: uid=020011AC953C3F5F0600C02502040303
cookie に書き込まれている識別子は rBEAAl8/PJUlwAAGAwMEAg==
だが uid_set で参照できる識別子は 020011AC953C3F5F0600C02502040303
で一致しない。
この辺りを眺めるとどうやら uid_set に対してごにょごにょした値が cookie にセットされるようだった。
uid_set の値から cookie に書き込んでいる値への変換処理はおそらくこの辺り。 ソースコードは正直何やっているのかちゃんと追えなかったのでメーリングリストに書いてある処理で cookie の値から uid_set の値に変換してみる
[1] pry(main)> require 'base64' => true [2] pry(main)> Base64.decode64("rBEAAl8/PJUlwAAGAwMEAg==").unpack("L*").map{|a| sprintf("%08X", a)}.join.upcase => "020011AC953C3F5F0600C02502040303"
ちゃんと変換できた。 sprintf("%08X", a)
が味噌で8桁で揃えないと先頭が0の場合にデータがかけて微妙に違うデータになる。
これは uid_set が uint32_t で定義されているからだと思う
逆のことをやれば、 uid_set から cookie の値に変換することもできる。
[4] pry(main)> Base64.encode64("020011AC953C3F5F0600C02502040303".downcase.scan(/.{1,8}/).map{|a| a.to_i(16)}.pack("l*")) => "rBEAAl8/PJUlwAAGAwMEAg==\n"
ngx_mruby の Nginx::Async::HTTP でハマったとこ
ngx_mruby にはノンブロッキングで httpリクエストできるNginx::Async::HTTP
というものが用意されている。
外部からリクエストを受け付ける nginx(便宜上frontとする) とfrontからリクエストを受け付けるnginx(便宜上apiとする)を用意して Nginx::Async::HTTP
を試してみた。
front の設定
user nginx; worker_processes 1; load_module modules/ndk_http_module.so; load_module modules/ngx_http_mruby_module.so; events { worker_connections 1024; } master_process off; http { include mime.types; server { listen 80; server_name localhost; location /sub_req_proxy_pass { proxy_pass http://nginx-api:8080/api; } location /front { mruby_rewrite_handler_code ' Nginx::Async::HTTP.sub_request "/sub_req_proxy_pass" res = Nginx::Async::HTTP.last_response Nginx.rputs res.body '; } } }
api の設定
user nginx; worker_processes 1; load_module modules/ndk_http_module.so; load_module modules/ngx_http_mruby_module.so; events { worker_connections 1024; } master_process off; http { include mime.types; server { listen 8080; server_name localhost; location /api { mruby_content_handler_code ' Nginx.echo "Hello World" '; } } }
上記の設定の nginx を立てて front にリクエストしたところ、レスポンスが返ってこなかった。
$ curl localhost:8000/front curl: (52) Empty reply from server
ngx_mruby/nginx.conf at master · matsumotory/ngx_mruby · GitHub
この辺りを眺めているとサブリクエストでは mruby_output_body_filter
ディレクティブをつけているので以下のようにサブリクエストのlocationディレクティブに mruby_output_body_filter
ディレクティブを追加するとレスポンスが返ってくるようになった
location /sub_req_proxy_pass { proxy_pass http://nginx-api:8080/api; mruby_output_body_filter_code ''; }
$ curl localhost:8000/front Hello World
知識不足でなぜ mruby_output_body_filter
ディレクティブ が必要なのかは理解できていないが、Nginx::Async::HTTP
を利用してサブリクエストを行う場合、サブリクエストのlocationディレクティブで mruby_output_body_filter
ディレクティブが必須のようだった
ActiveAdmin のフィルターでAND検索する
ActiveAdmin の filter では Ransack を利用することができる。
例えば以下のように定義してあげると、 nameカラムに対して LIKE検索することができる。
ActiveAdmin.register User do fitler :name_matches end
Ransackでは cont_allを利用することで、特定のカラムを複数の単語でAND検索することができる。
では、ActiveAdminでフォームにスペース区切りで複数の単語を入力した場合にnameカラムをAND検索したい場合はどうすれば良いだろうか
ActiveAdmin.register User do fitler :name_cont_all end
これまでの説明であれば、なんとなく上記のようなfilterを定義してあげるとできそうな気がする。 しかし上記のfilter を定義しフォームに「hoge fuga」を入力し検索を行うと以下のようなクエリが発行された。
SELECT COUNT(*) FROM (SELECT 1 AS one FROM `users` WHERE `users`.`deleted_at` IS NULL AND (`users`.`name` LIKE '%hoge fuga%') LIMIT 30 OFFSET 0) subquery_for_count
「hoge fuga」を1つの単語としてLIKE検索しているのである。 これは Ransack側は単語を配列で渡されることを期待しているがActiveAdminは素直に文字列で渡しているためである。
AND検索するためにはフォームに入力した文字列が、Rnsackに渡る前に配列に変換すれば良い。
例えば、以下のように before_action で文字列を配列に変換するなどが考えられる。
ActiveAdmin.register User do fitler :name_cont_all controller do before_action :adjust_search_params, only: :index private def adjust_search_params return if params[:q].nil? || params[:q][:name_cont_all].nil? params[:q][:name_cont_all] = params[:q][:name_cont_all].split(/[[:blank:]]+/) end end end
ちなみにActiveAdmin側でスペース区切りで入力された単語を配列に分割するようにPull Requestが投げられているようだががマージには至っていない。
warden のコードリーディング
warden gem のコードリーディングしたのでメモ
warden とは
rack ベースの認証フレームワーク。 devise でお馴染みの gem ですね。
rack middleware の登録
warden は rack middleware として動作します。 rails とかだと config/application.rb
あたりで以下のように rack middleware に登録することで warden を利用することができます。
# https://github.com/wardencommunity/warden/wiki/Setup module Hoge class Application < Rails::Application config.use Warden::Manager do |manager| manager.default_strategies :password manager.failure_app = BadAuthenticationEndsUpHere end end end
また、以下のように認証で利用する strategy の実態(認証処理)を登録してあげる必要があります。
Warden::Strategies.add
で実態を登録した場合、Warden::Strategies::Base
のサブクラスとして実態が定義されます。
# https://github.com/wardencommunity/warden/wiki/Strategies Warden::Strategies.add(:password) do def valid? params['username'] || params['password'] end def authenticate! u = User.authenticate(params['username'], params['password']) u.nil? ? fail!("Could not log in") : success!(u) end end
ブロック内で行なっている処理を理解するためにまずは Warden::Manager を読んでいきます
Warden::Manager の initializer は以下のようになっています。
# https://github.com/wardencommunity/warden/blob/master/lib/warden/manager.rb#L19-L25 def initialize(app, options={}) default_strategies = options.delete(:default_strategies) @app, @config = app, Warden::Config.new(options) @config.default_strategies(*default_strategies) if default_strategies yield @config if block_given? end
Warden::Config
のインスタンスを生成し @config
に代入。
ブロックが与えられていれば @config
を引数にブロックを実行しています。
middleware 登録の時のブロック引数 manager
の正体は Warden::Config
のインスタンスでした。
続いて ブロック内でやっている manager.default_strategies
について理解するために Warden::Config を読みます。
# https://github.com/wardencommunity/warden/blob/master/lib/warden/config.rb#L63-L70 def default_strategies(*strategies) opts = Hash === strategies.last ? strategies.pop : {} hash = self[:default_strategies] scope = opts[:scope] || :_all hash[scope] = strategies.flatten unless strategies.empty? hash[scope] || hash[:_all] || [] end
引数 strategies を配列で受け取り、最後の要素が hash の場合は それを opts に代入しています。
続いて hash = self[:default_strategies]
でデフォルトのstrategyを取得しています。
hash = self[:default_strategies]
という記法はすこし奇妙に感じますが、Warden::Config が Hash Class のサブクラスだからこのように書けるのですね。
最後に opts
から scope を取得し、 hash[scope]
に引数 strategies を代入しています。
続いて manager.failure_app = BadAuthenticationEndsUpHere
の処理を見ていきます。
# https://github.com/wardencommunity/warden/blob/master/lib/warden/config.rb#L21-L35 def self.hash_accessor(*names) #:nodoc: names.each do |name| class_eval <<-METHOD, __FILE__, __LINE__ + 1 def #{name} self[:#{name}] end def #{name}=(value) self[:#{name}] = value end METHOD end end hash_accessor :failure_app, :default_scope, :intercept_401
hash_accessor により Warden::Config#failure_app=
が動的に定義されていて、self[:failure_app] に代入をしています。
ここまでで、 冒頭のようにmiddleware を登録すると Warden::Manager
の @config
は以下の内容になっていることがわかります
# 簡易化のためhash 形式かつ default_strategiesとfailure_app 以外のキーを除外しています { default_strategies: { _all: [:password] }, failure_app: BadAuthenticationEndsUpHere }
middleware の呼び出し
rack middleware は callメソッドによって呼び出されます。早速 Warden::Manager
の call メソッドを読んでみます。
# https://github.com/wardencommunity/warden/blob/master/lib/warden/manager.rb#L30-L48 def call(env) # :nodoc: return @app.call(env) if env['warden'] && env['warden'].manager != self env['warden'] = Proxy.new(env, self) result = catch(:warden) do env['warden'].on_request @app.call(env) end result ||= {} case result when Array handle_chain_result(result.first, result, env) when Hash process_unauthenticated(env, result) when Rack::Response handle_chain_result(result.status, result, env) end end
env['warden']
に Warden::Proxy
のインスタンスを代入しています。
なぜ env にいれているかというと、この rack middleware の外からenv['warden'] を参照したいからだと思われます。
catch(:warden)
で後続のブロック内の処理から大域脱出をできるようにしています。
ブロックの中を見ていきます。
env['warden'].on_request
では詳細省略しますが、warden では以下のようにon_requestコールバックを登録することができ、on_requestコールバックが登録されていればここで実行されます。
Warden::Manager.on_request do |proxy| ... end
コールバックを実行したら後続のmiddlewareおよび rack アプリケーションを実行しています。
認証処理
ここまでで、 Warden::Manager middlewareでは 認証方法(strategy)の指定や env['warden'] に Warden::Proxy のインスタンスを代入するなどの初期化処理をしていることがわかりました。
続いてrack アプリケーション内で実際に認証をする方法を見ていきます。
warden ではアプリケーションの任意の場所で以下のようにすることで認証を行うことができます。
env['warden'].authenticate!
env['warden'] は Warden::Proxy のインスタンスでしたね。では Warden::Proxy#authenticate!
を見ていきましょう
# https://github.com/wardencommunity/warden/blob/master/lib/warden/proxy.rb#L132-L136 def authenticate!(*args) user, opts = _perform_authentication(*args) throw(:warden, opts) unless user user end
_perform_authentication
を実行して戻り値 user が存在しなければ throw(:warden, opts)
で大域脱出。user が存在すれば そのまま user を返しています。
_perform_authentication
を見ましょう
# https://github.com/wardencommunity/warden/blob/master/lib/warden/proxy.rb#L328-L343 def _perform_authentication(*args) scope, opts = _retrieve_scope_and_opts(args) user = nil # Look for an existing user in the session for this scope. # If there was no user in the session, see if we can get one from the request. return user, opts if user = user(opts.merge(:scope => scope)) _run_strategies_for(scope, args) if winning_strategy && winning_strategy.successful? opts[:store] = opts.fetch(:store, winning_strategy.store?) set_user(winning_strategy.user, opts.merge!(:event => :authentication)) end [@users[scope], opts] end
_retrieve_scope_and_opts
は @config
から scope と そのscope に対応する optionを取り出しているようです。
middleware 登録時に何も指定していない場合は scope
は :default
それに対応するオプションは 空のhash になります。
続いて user メソッドを opts を引数に呼び出しています。
冒頭のようにmiddleware を登録した場合だと opts の中身は {scope: :default}
になります。
user メソッドも割愛しますが、@users に該当 scope のキャッシュがいればそれを返し、なければ session の取り出しを試みます。複数 authenticate!
が呼ばれてもパフォーマンスが劣化しないようにキャッシュしたり、一度認証が通ってsession が作成されればsessionが残っていれば再認証する必要がないのでこの処理を挟んでいるようです。
user メソッド を呼び出しても user が存在しなければ後続の _run_strategies_for(scope, args)
に続きます。
# https://github.com/wardencommunity/warden/blob/master/lib/warden/proxy.rb#L353-L376 def _run_strategies_for(scope, args) #:nodoc: self.winning_strategy = @winning_strategies[scope] return if winning_strategy && winning_strategy.halted? # Do not run any strategy if locked return if @locked if args.empty? defaults = @config[:default_strategies] strategies = defaults[scope] || defaults[:_all] end (strategies || args).each do |name| strategy = _fetch_strategy(name, scope) next unless strategy && !strategy.performed? && strategy.valid? catch(:warden) do _update_winning_strategy(strategy, scope) end strategy._run! _update_winning_strategy(strategy, scope) break if strategy.halted? end end
winning_strategy
は恐らく最後に呼び出したstrategyです。初回は nil にっているはずです。
args は env['warden'].authenticate!
の引数でしたね。
今回の場合 authenticate! には何も渡してないので args は空です。
argsが空の場合、@config からstrategies を取り出します
「 rack middleware の登録」の箇所に書いた通り@configは以下のようになっているので、今回の場合 strategies は [:password, :basic]
になります
{ default_strategies: { _all: [:password] }, failure_app: BadAuthenticationEndsUpHere }
続いて取得した strategies をループ回して _fetch_strategy(name, scope)
で strategyの実態の取り出し、
strategy._run!
でstrategyの実行を行なっています。
# https://github.com/wardencommunity/warden/blob/master/lib/warden/proxy.rb#L379-L387 _fetch_strategy(name, scope) def _fetch_strategy(name, scope) @strategies[scope][name] ||= if klass = Warden::Strategies[name] klass.new(@env, scope) elsif @config.silence_missing_strategies? nil else raise "Invalid strategy #{name}" end end
冒頭で登録した strategy の実態を取り出し、そのインスタンスが返されています。
strategy の実態が定義されていない場合は 例外が返されます。
strategy を取得できたら strategy._run!
を呼び出します。
strategy は Warden::Strategies::Base
のサブクラスとして定義されているので Warden::Strategies::Base#_run!
のを見ます。
# https://github.com/wardencommunity/warden/blob/master/lib/warden/strategies/base.rb#L52-L56 def _run! # :nodoc: @performed = true authenticate! self end
@performed
にして authenticate! を呼び出していますね。strategy の authenticate!
メソッドは 冒頭で strategy の実態を登録する際に定義した認証処理でしたね。
authenticate! では任意の認証処理を実行して 失敗した場合は fail!
成功した場合は success!(user)
を呼び出しています。
# https://github.com/wardencommunity/warden/blob/master/lib/warden/strategies/base.rb#L144-L147 def fail!(message = "Failed to Login") halt! @message = message @result = :failure end # https://github.com/wardencommunity/warden/blob/master/lib/warden/strategies/base.rb#L125-L130 def success!(user, message = nil) halt! @user = user @message = message @result = :success end
fail!
の場合は helt!
で @helted
を true にして @result
に :failure
を代入しています。
@helted
が true の場合だと後続のstrategy が実行されないので、後続のstrategy もトライする場合は fail!
ではなくfail
で呼び出す必要があります。
success!
は helt!
を実行し、 @user
に認証によって取得することができた user を代入しています。
strategy._run!
で認証処理を実行した後は_update_winning_strategy(strategy, scope)
でwinning_strategyを直前に実行したstrategyに更新します。その後strategy.helted?
を確認して true だと 認証が終了したとみなしループを抜けます。
_run_strategies_for
の処理を見たので _perform_authentication
に戻ります。
_run_strategies_for
が終了すると winning_strategy.successful? で直前に実行された認証が成功しているか確認します。
成功していれば set_user(winning_strategy.user, opts.merge!(:event => :authentication))
を実行します。
# https://github.com/wardencommunity/warden/blob/master/lib/warden/proxy.rb#L170-L194 def set_user(user, opts = {}) scope = (opts[:scope] ||= @config.default_scope) # Get the default options from the master configuration for the given scope opts = (@config[:scope_defaults][scope] || {}).merge(opts) opts[:event] ||= :set_user @users[scope] = user if opts[:store] != false && opts[:event] != :fetch options = env[ENV_SESSION_OPTIONS] if options if options.frozen? env[ENV_SESSION_OPTIONS] = options.merge(:renew => true).freeze else options[:renew] = true end end session_serializer.store(user, scope) end run_callbacks = opts.fetch(:run_callbacks, true) manager._run_callbacks(:after_set_user, user, self, opts) if run_callbacks @users[scope] end
set_user では @usersに認証で取得できたuser のデータをキャッシュし、sessionに書き込んでいます。
set_user
の処理が終わると [@users[scope], opts]
を返して _perform_authentication
の処理は終わりです。
_perform_authentication
が終わったので authenticate!
に戻ります。
だいぶ離れたのでコードをいかに再掲
def authenticate!(*args) user, opts = _perform_authentication(*args) throw(:warden, opts) unless user user end
_perform_authentication は strategy で定義した認証が成功したら user は認証によって取得されたデータ。失敗したら nilが返ってきました。
認証が成功した場合は そのまま user を返して終わりです。
認証が失敗した場合は、認証が失敗した場合は throw(:warden, opts)
で大域脱出をはかり、
Warden::Manager#call まで処理が戻ります。以下再掲
def call(env) # :nodoc: return @app.call(env) if env['warden'] && env['warden'].manager != self env['warden'] = Proxy.new(env, self) result = catch(:warden) do env['warden'].on_request @app.call(env) end result ||= {} case result when Array handle_chain_result(result.first, result, env) when Hash process_unauthenticated(env, result) when Rack::Response handle_chain_result(result.status, result, env) end end
認証に成功した場合は result は@app.call(env)
の戻り値になります。rack middleware の場合は基本的に Array になります。
認証に失敗した場合は throw(:warden, opts)
の opts がresult に入ります。opts は Hash です。
認証が正常に終了した場合は result は Array のはずなので handle_chain_result を実行します。 handle_chain_result は基本的に result をそのまま返しています。
認証が失敗した場合は result は Hash なので process_unauthenticated(env, result)
を実行します。
process_unauthenticated
では 基本的に 冒頭で登録した failure_app の call メソッドを呼び出します。
failure_app をカスタマイズすることで、認証失敗時の処理を細かくカスタマイズすることができます。
終わりに
長くなったかつ最後の方はちょっと力尽きた感ありましたが、warden のコードを読む機会があったのでメモしておきます
社内でElasticsearch勉強会を開いた
sidekiq の scheduled jobの性能
sidekiq と scheduled job
ruby でよく使われるジョブキューにsidekiqというものがあります。 Rails などでは Active Job のバックエンドとして使うこともできます。
sidekiq は即時的に処理を行うだけではなく、Scheduled Jobという機能があり、例えば以下のようにすると1時間後にjobを実行することができます。
MyJob.perform_in(1.hour) # active job MyJob.set(wait: 1.hour).perform_later
また、即時で実行したい場合には以下のようにします
MyJob.perform_async # active job MyJob.perform_later
sidekiq の queue の実装
sidekiqでは、queueにredisを使用しており、即時実行用の queue と scheduled job 用のqueueは異なった実装になっています。 それぞれについて、どのようにjobを溜めているのか、どのようにjobを取り出しているのかをみていきます。
即時実行
即時実行の場合、 redisのリストに
"queue:#{queue_name}"
というkeyでjobがpushされていきます。コードはこの辺です。
キューから取り出すときも単純で、単にリストからpopしているだけです。
scheduled job
scheduled job はソート済みセット型を使用しています。 n時間後に実行の場合、以下のようにtimeオブジェクトを浮動小数点数への変換を行い、変換した値をソート済みセット型のスコアとして使用しています。
# https://github.com/mperham/sidekiq/blob/2ed92600fa71a9c275189d01df369ad4f8b9ca32/lib/sidekiq/worker.rb#L55-L66 def perform_in(interval, *args) int = interval.to_f now = Time.now.to_f ts = (int < 1_000_000_000 ? now + int : int) payload = @opts.merge('class' => @klass, 'args' => args, 'at' => ts) # Optimization to enqueue something now that is scheduled to go out now or in the past payload.delete('at') if ts <= now @klass.client_push(payload) end alias_method :perform_at, :perform_in end
# https://github.com/mperham/sidekiq/blob/2ed92600fa71a9c275189d01df369ad4f8b9ca32/lib/sidekiq/worker.rb#L136-L144 def client_push(item) # :nodoc: pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options['pool'] || Sidekiq.redis_pool # stringify item.keys.each do |key| item[key.to_s] = item.delete(key) end Sidekiq::Client.new(pool).push(item) end
# https://github.com/mperham/sidekiq/blob/2ed92600fa71a9c275189d01df369ad4f8b9ca32/lib/sidekiq/client.rb#L69-L77 def push(item) normed = normalize_item(item) payload = process_single(item['class'], normed) if payload raw_push([payload]) payload['jid'] end end
# https://github.com/mperham/sidekiq/blob/2ed92600fa71a9c275189d01df369ad4f8b9ca32/lib/sidekiq/client.rb#L181-L206 def raw_push(payloads) @redis_pool.with do |conn| conn.multi do atomic_push(conn, payloads) end end true end def atomic_push(conn, payloads) if payloads.first['at'] conn.zadd('schedule', payloads.map do |hash| at = hash.delete('at').to_s [at, Sidekiq.dump_json(hash)] end) else q = payloads.first['queue'] now = Time.now.to_f to_push = payloads.map do |entry| entry['enqueued_at'] = now Sidekiq.dump_json(entry) end conn.sadd('queues', q) conn.lpush("queue:#{q}", to_push) end end
続いて、queueからjobを取り出す処理を見ていきます。
- ソート時みセット型から、
Time.now.to_f.to_s
以下のスコアのjobを取得 - 取得したjobをソート済みセット型から削除を試みる
- 削除が成功すると、取得したjobを即時実行のqueueにpushする
という処理を行なっています
# https://github.com/mperham/sidekiq/blob/2ed92600fa71a9c275189d01df369ad4f8b9ca32/lib/sidekiq/scheduled.rb#L11-L33 def enqueue_jobs(now=Time.now.to_f.to_s, sorted_sets=SETS) # A job's "score" in Redis is the time at which it should be processed. # Just check Redis for the set of jobs with a timestamp before now. Sidekiq.redis do |conn| sorted_sets.each do |sorted_set| # Get the next item in the queue if it's score (time to execute) is <= now. # We need to go through the list one at a time to reduce the risk of something # going wrong between the time jobs are popped from the scheduled queue and when # they are pushed onto a work queue and losing the jobs. while job = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do # Pop item off the queue and add it to the work queue. If the job can't be popped from # the queue, it's because another process already popped it so we can move on to the # next one. if conn.zrem(sorted_set, job) Sidekiq::Client.push(Sidekiq.load_json(job)) Sidekiq::Logging.logger.debug { "enqueued #{sorted_set}: #{job}" } end end end end end
queueの実装のまとめ
sidekiqでは、即時実行のqueueからjobをpopし、実行するやつはworker、scheduled job のqueue からjobを取得し即時実行のqueueにpushするやつはpooler と呼ばれておりそれぞれ別のスレッドで動いています。
ごちゃごちゃ書きましたが、文章だけだとわかりづらいので図にするとこんな感じです。
scheduled job のパフォーマンス
ようやく本題です。 ある時点に多くの job を schedule すると、scheduled job の性能が悪くなります。 ここでいう性能が悪いというのは「n分後にscheduleしたが、実際に実行されるまでに時間がかかる」ということを意味しています。
処理量が増えるので、1つの poller あたりの処理量が頭打ちになり性能が悪くなるのは当たり前と思うかもしれません。 しかし、この問題は poller を増やしてもあまり改善しません。 「なぜpoller を増やしても改善しないのか、また即時実行の場合はworker を増やせば大量の処理を捌けるのか」ということを簡単な実験をしながら検証していきます。
即時実行の場合
上で説明した通り、即時実行の場合はredisのリスト型の queue にobが積まれており、worker が queue に積まれているjobを取り出し実行します。
簡易的に queue に 10万件の job が積まれていることを以下のように表現します。
require 'redis' redis = Redis.new(host: '127.0.0.1', port: '6379') 100000.times do |i| redis.rpush('hoge', i) end
そして、次のように 並列数(worker の数)を増やしつつ、job を 全て queue から取り出す速度を計測します。
require 'benchmark' require 'redis' require 'connection_pool' number_of_worker = 5 Benchmark.realtime do Parallel.each((1..number_of_worker).to_a, in_processes: number_of_worker) do |i| redis_pool.with do |conn| while job = conn.rpop('hoge') do end end end end
結果は以下のようになりました。
並列数 | 実行時間 |
---|---|
1 | 21.21864500000038 |
3 | 19.56006800000023 |
5 | 15.638484000000062 |
7 | 13.080714000000171 |
10 | 11.504145999999764 |
並列数が増えるにしたがって、実行時間が短くなっていることがわかります。 このことから、即時実行でパフォーマンスに問題がある場合に、worker の数を増やすことは一定の効果があると考えられます。
scheduled job の場合
scheduled job の場合はソート済みセット型をqueueとして利用しているので、以下のように10万件の job が積まれていることを表現します。
require 'redis' redis = Redis.new(host: '127.0.0.1', port: '6379') 100000.times do |i| redis.zadd('hoge', rand, i) end
そして、同じように 並列数を増やしつつ、job を 全て queue から取り出す速度を計測します。
require 'benchmark' require 'redis' require 'connection_pool' number_of_worker = 5 Benchmark.realtime do Parallel.each((1..number_of_worker).to_a, in_processes: number_of_worker) do |i| redis_pool.with do |conn| while job = conn.zrangebyscore('hoge', 0, 1, :limit => [0, 1]).first do conn.zrem('hoge', job) end end end end
ポイントは即時実行の場合はqueueから job を取得する処理とqueueから削除する処理が同時にできるのに対し、 scheduled job では取得と削除を別々にやってます。
計測結果は以下のようになりました。
並列数 | 実行時間 |
---|---|
1 | 46.90739100000064 |
3 | 82.63857199999984 |
5 | 125.35556799999995 |
7 | 111.53749299999981 |
10 | 150.0042739999999 |
並列数が増えるに連れて実行時間は長くなる傾向にあるようです。
scheduled job では並列で、queue から job の取得・削除を行うので以下のずのように競合が発生します。 競合が発生すると、片方の poller の処理が無駄になり効率が悪くなってしまいます。 並列数が増えると競合する確率が高くなり、処理の効率が落ち結果的に実行時間が長くなってしまうのかもしれません。
実際にどのくらいの回数競合しているかを調べて見ます。
redis.del 'conflict' Parallel.each((1..number_of_worker).to_a, in_processes: number_of_worker) do |i| redis_pool.with do |conn| while job = conn.zrangebyscore('hoge', 0, 1, :limit => [0, 1]).first do unless conn.zrem('hoge', job) conn.rpush('conflict', rand) end end end end redis.llen 'conflict'
並列数 | 競合数 |
---|---|
1 | 0 |
3 | 90490 |
5 | 164632 |
7 | 219304 |
10 | 308126 |
並列数が増えるに連れて、競合数も増えていることがわかります。 今回の実験では job数が10万件だったので、単純に10万回処理すれば良いはずですが、並列数10の場合は 308126回も処理が無駄になっています。
もちろん、実際は各poller が queue をポーリングするタイミングはもう少しばらけますが、poller が増えれば競合する確率も高くなるということには変わりありません。
同じ時間帯に大量の job を scheduling していて、実際に実行される時間までラグがあるという場合には、処理を1つのjobにできるだけまとめて、jobの数自体を減らすなどの工夫が必要かもしれません*1。
*1:処理をまとめるかつ冪等なjobを書かないといけないという難しさはありますが...