skaffold で image 作成時に volumes で クレデンシャルを渡そうとしてハマった

Docker image を作成する時プライベートなリポジトリをpullする、s3からなんか引っ張ってくる、などなど Dockerfile 内で認証情報などのクレデンシャルを利用した処理を行いたいことが多々あると思います。


僕はKubernetesのアプリケーションを開発する際に skaffoldというツールを利用しています。
image のビルドもskaffold と Kubernetes クラスタ内で image をビルドするため kaniko というツールを組み合わせて行なっています。

skaffold + kaniko で前述のようにクレデンシャルを利用する処理を含んだ Dockerfile からイメージを作成しようとしてハマった事を記録します。


今回は GitHub Packages に公開された npm package を取得する例を元に説明します。
npm package を取得するための認証は以下の内容を `.npmrc` ファイルとして保存する事で可能です。

docs.github.com

//npm.pkg.github.com/:_authToken=TOKEN


kaniko を利用してimageをビルドする場合、Kubernetes クラスタ上に kaniko pod が作成され pod上でビルドを行います。
ということで kaniko pod 作成時に 上記の .npmrc を 渡してあげると良さそうです。

僕は .npmrc を secret に保存し Volumes でマウントさせる方法を試しました。

skaffold を利用する場合、諸々の設定を skaffold.yaml に書きます。

skaffold.yaml のリファレンスは以下です。

skaffold.yaml | Skaffold

リファレンスを参考に以下のような skaffold.yaml を用意しました。

apiVersion: skaffold/v2beta10
kind: Config
build:
  artifacts:
  - image: gcr.io/k8s-skaffold/example
    context: .
    kaniko:
      volumeMounts:
      - name: npmrc
        mountPath: "/root/"
  cluster:
    pullSecretName: kaniko-secret
    volumes:
      - name: npmrc
        secret:
          secretName: npmrc

npmrc という名前のた secret を kaniko pod の /root/ にマウントしています。
secret は以下のような感じなので kaniko-pod に /root/.npmrc がマウントされます。

Name:        npmrc
Namespace:    default
Labels:       <none>
Annotations:  <none>

Type:  Opaque
続きを読む

ngx_http_userid_module の uid_got/uid_set と cookie に書き込まれる uid について

ngx_http_userid_module を使うと nginx でクライアントを識別するための 識別子 を cookie に焼くことができる。

また、cookie に書き込んだあたいは $uid_got/$uid_set の組み込み変数で参照可能でログなどに記録することができる。

例えば、以下のような conf で nginx を起動し、適当にアクセスした場合のログを見てみる

user  nginx;
worker_processes  1;

error_log  /var/log/nginx/error.log warn;
pid        /var/run/nginx.pid;

events {
    worker_connections  1024;
}


http {
    include       /etc/nginx/mime.types;
    default_type  application/octet-stream;

    userid          on;
    userid_name     uid;
    userid_path     /;
    userid_expires  max;

    log_format  main  "time:$time_local\t"
                      'req:$request\t'
                      'cookie:$http_cookie\t'
                      'set_cookie:$sent_http_set_cookie\t'
                      'uid_got:$uid_got\t'
                      'uid_set:$uid_set';

    access_log  /var/log/nginx/access.log  main;

    sendfile        on;
    include /etc/nginx/conf.d/*.conf;
}

ログはこちらになる

time:21/Aug/2020:03:16:37 +0000 req:GET / HTTP/1.1      cookie:-        set_cookie:uid=rBEAAl8/PJUlwAAGAwMEAg==; expires=Thu, 31-Dec-37 23:55:55 GMT; path=/    uid_got:-       uid_set:uid=020011AC953C3F5F0600C02502040303

set_cookie と uid_set(初回はuid_set に値が入って次回以降はuid_gotに値が入る)を見る

set_cookie:uid=rBEAAl8/PJUlwAAGAwMEAg==; expires=Thu, 31-Dec-37 23:55:55 GMT; path=/

uid_set: uid=020011AC953C3F5F0600C02502040303

cookie に書き込まれている識別子は rBEAAl8/PJUlwAAGAwMEAg== だが uid_set で参照できる識別子は 020011AC953C3F5F0600C02502040303 で一致しない。

この辺りを眺めるとどうやら uid_set に対してごにょごにょした値が cookie にセットされるようだった。

uid_set の値から cookie に書き込んでいる値への変換処理はおそらくこの辺りソースコードは正直何やっているのかちゃんと追えなかったのでメーリングリストに書いてある処理で cookie の値から uid_set の値に変換してみる

[1] pry(main)> require 'base64'
=> true
[2] pry(main)> Base64.decode64("rBEAAl8/PJUlwAAGAwMEAg==").unpack("L*").map{|a| sprintf("%08X", a)}.join.upcase
=> "020011AC953C3F5F0600C02502040303"

ちゃんと変換できた。 sprintf("%08X", a) が味噌で8桁で揃えないと先頭が0の場合にデータがかけて微妙に違うデータになる。 これは uid_set が uint32_t で定義されているからだと思う

逆のことをやれば、 uid_set から cookie の値に変換することもできる。

[4] pry(main)> Base64.encode64("020011AC953C3F5F0600C02502040303".downcase.scan(/.{1,8}/).map{|a|  a.to_i(16)}.pack("l*"))
=> "rBEAAl8/PJUlwAAGAwMEAg==\n"

ngx_mruby の Nginx::Async::HTTP でハマったとこ

ngx_mruby にはノンブロッキングhttpリクエストできるNginx::Async::HTTPというものが用意されている。

外部からリクエストを受け付ける nginx(便宜上frontとする) とfrontからリクエストを受け付けるnginx(便宜上apiとする)を用意して Nginx::Async::HTTP を試してみた。

front の設定

user  nginx;
worker_processes  1;
load_module modules/ndk_http_module.so;
load_module modules/ngx_http_mruby_module.so;

events {
    worker_connections  1024;
}

master_process off;

http {
    include       mime.types;

    server {
        listen       80;
        server_name  localhost;

        location /sub_req_proxy_pass {
            proxy_pass http://nginx-api:8080/api;
        }

        location /front {
            mruby_rewrite_handler_code '
              Nginx::Async::HTTP.sub_request "/sub_req_proxy_pass"
              res = Nginx::Async::HTTP.last_response
              Nginx.rputs res.body
            ';
        }
    }
}

api の設定

user  nginx;
worker_processes  1;
load_module modules/ndk_http_module.so;
load_module modules/ngx_http_mruby_module.so;

events {
    worker_connections  1024;
}

master_process off;

http {
    include       mime.types;

    server {
        listen       8080;
        server_name  localhost;

        location /api {
            mruby_content_handler_code '
              Nginx.echo "Hello World"
            ';
        }
    }
}

上記の設定の nginx を立てて front にリクエストしたところ、レスポンスが返ってこなかった。

$ curl localhost:8000/front
curl: (52) Empty reply from server

ngx_mruby/nginx.conf at master · matsumotory/ngx_mruby · GitHub

この辺りを眺めているとサブリクエストでは mruby_output_body_filter ディレクティブをつけているので以下のようにサブリクエストのlocationディレクティブに mruby_output_body_filter ディレクティブを追加するとレスポンスが返ってくるようになった

location /sub_req_proxy_pass {
    proxy_pass http://nginx-api:8080/api;
    mruby_output_body_filter_code '';
}
$ curl localhost:8000/front
Hello World

知識不足でなぜ mruby_output_body_filterディレクティブ が必要なのかは理解できていないが、Nginx::Async::HTTP を利用してサブリクエストを行う場合、サブリクエストのlocationディレクティブで mruby_output_body_filter ディレクティブが必須のようだった

ActiveAdmin のフィルターでAND検索する

ActiveAdmin の filter では Ransack を利用することができる。

例えば以下のように定義してあげると、 nameカラムに対して LIKE検索することができる。

ActiveAdmin.register User do
fitler :name_matches

end

Ransackでは cont_allを利用することで、特定のカラムを複数の単語でAND検索することができる。

では、ActiveAdminでフォームにスペース区切りで複数の単語を入力した場合にnameカラムをAND検索したい場合はどうすれば良いだろうか

ActiveAdmin.register User do
fitler :name_cont_all

end

これまでの説明であれば、なんとなく上記のようなfilterを定義してあげるとできそうな気がする。 しかし上記のfilter を定義しフォームに「hoge fuga」を入力し検索を行うと以下のようなクエリが発行された。

SELECT COUNT(*) FROM (SELECT 1 AS one FROM `users` WHERE `users`.`deleted_at` IS NULL AND (`users`.`name` LIKE '%hoge fuga%') LIMIT 30 OFFSET 0) subquery_for_count

hoge fuga」を1つの単語としてLIKE検索しているのである。 これは Ransack側は単語を配列で渡されることを期待しているActiveAdminは素直に文字列で渡しているためである。

AND検索するためにはフォームに入力した文字列が、Rnsackに渡る前に配列に変換すれば良い。

例えば、以下のように before_action で文字列を配列に変換するなどが考えられる。

ActiveAdmin.register User do
  fitler :name_cont_all
  
  controller do
    before_action :adjust_search_params, only: :index
  
    private
  
    def adjust_search_params
      return if params[:q].nil? || params[:q][:name_cont_all].nil?
  
      params[:q][:name_cont_all] = params[:q][:name_cont_all].split(/[[:blank:]]+/)
    end
  end
end

ちなみにActiveAdmin側でスペース区切りで入力された単語を配列に分割するようにPull Requestが投げられているようだががマージには至っていない。

Support Ransacks _all and _any requests by kewubenduben · Pull Request #5465 · activeadmin/activeadmin · GitHub

warden のコードリーディング

warden gem のコードリーディングしたのでメモ

warden とは

rack ベースの認証フレームワーク。 devise でお馴染みの gem ですね。

github.com

rack middleware の登録

warden は rack middleware として動作します。 rails とかだと config/application.rb あたりで以下のように rack middleware に登録することで warden を利用することができます。

# https://github.com/wardencommunity/warden/wiki/Setup
module Hoge
  class Application < Rails::Application
    config.use Warden::Manager do |manager|
      manager.default_strategies :password
      manager.failure_app = BadAuthenticationEndsUpHere
    end
  end
end

また、以下のように認証で利用する strategy の実態(認証処理)を登録してあげる必要があります。 Warden::Strategies.add で実態を登録した場合、Warden::Strategies::Base のサブクラスとして実態が定義されます。

# https://github.com/wardencommunity/warden/wiki/Strategies
Warden::Strategies.add(:password) do
  def valid?
    params['username'] || params['password']
  end

  def authenticate!
    u = User.authenticate(params['username'], params['password'])
    u.nil? ? fail!("Could not log in") : success!(u)
  end
end

ブロック内で行なっている処理を理解するためにまずは Warden::Manager を読んでいきます

Warden::Manager の initializer は以下のようになっています。

# https://github.com/wardencommunity/warden/blob/master/lib/warden/manager.rb#L19-L25

def initialize(app, options={})
  default_strategies = options.delete(:default_strategies)

  @app, @config = app, Warden::Config.new(options)
  @config.default_strategies(*default_strategies) if default_strategies
  yield @config if block_given?
end

Warden::Configインスタンスを生成し @config に代入。 ブロックが与えられていれば @config を引数にブロックを実行しています。

middleware 登録の時のブロック引数 manager の正体は Warden::Configインスタンスでした。 続いて ブロック内でやっている manager.default_strategies について理解するために Warden::Config を読みます。

# https://github.com/wardencommunity/warden/blob/master/lib/warden/config.rb#L63-L70

def default_strategies(*strategies)
  opts  = Hash === strategies.last ? strategies.pop : {}
  hash  = self[:default_strategies]
  scope = opts[:scope] || :_all

  hash[scope] = strategies.flatten unless strategies.empty?
  hash[scope] || hash[:_all] || []
end

引数 strategies を配列で受け取り、最後の要素が hash の場合は それを opts に代入しています。 続いて hash = self[:default_strategies] でデフォルトのstrategyを取得しています。 hash = self[:default_strategies] という記法はすこし奇妙に感じますが、Warden::Config が Hash Class のサブクラスだからこのように書けるのですね。

最後に opts から scope を取得し、 hash[scope] に引数 strategies を代入しています。

続いて manager.failure_app = BadAuthenticationEndsUpHere の処理を見ていきます。

# https://github.com/wardencommunity/warden/blob/master/lib/warden/config.rb#L21-L35

def self.hash_accessor(*names) #:nodoc:
  names.each do |name|
    class_eval <<-METHOD, __FILE__, __LINE__ + 1
      def #{name}
        self[:#{name}]
      end
      def #{name}=(value)
        self[:#{name}] = value
      end
    METHOD
  end
end

hash_accessor :failure_app, :default_scope, :intercept_401

hash_accessor により Warden::Config#failure_app= が動的に定義されていて、self[:failure_app] に代入をしています。

ここまでで、 冒頭のようにmiddleware を登録すると Warden::Manager@config は以下の内容になっていることがわかります

# 簡易化のためhash 形式かつ default_strategiesとfailure_app 以外のキーを除外しています
{
  default_strategies: {
    _all: [:password]
  },
  failure_app: BadAuthenticationEndsUpHere
}

middleware の呼び出し

rack middleware は callメソッドによって呼び出されます。早速 Warden::Manager の call メソッドを読んでみます。

# https://github.com/wardencommunity/warden/blob/master/lib/warden/manager.rb#L30-L48
def call(env) # :nodoc:
  return @app.call(env) if env['warden'] && env['warden'].manager != self
  env['warden'] = Proxy.new(env, self)
  result = catch(:warden) do
    env['warden'].on_request
    @app.call(env)
  end

  result ||= {}
  case result
  when Array
    handle_chain_result(result.first, result, env)
  when Hash
    process_unauthenticated(env, result)
  when Rack::Response
    handle_chain_result(result.status, result, env)
  end
end

env['warden']Warden::Proxyインスタンスを代入しています。 なぜ env にいれているかというと、この rack middleware の外からenv['warden'] を参照したいからだと思われます。

catch(:warden) で後続のブロック内の処理から大域脱出をできるようにしています。 ブロックの中を見ていきます。 env['warden'].on_request では詳細省略しますが、warden では以下のようにon_requestコールバックを登録することができ、on_requestコールバックが登録されていればここで実行されます。

Warden::Manager.on_request do |proxy|
  ...
end

コールバックを実行したら後続のmiddlewareおよび rack アプリケーションを実行しています。

認証処理

ここまでで、 Warden::Manager middlewareでは 認証方法(strategy)の指定や env['warden'] に Warden::Proxy のインスタンスを代入するなどの初期化処理をしていることがわかりました。

続いてrack アプリケーション内で実際に認証をする方法を見ていきます。

warden ではアプリケーションの任意の場所で以下のようにすることで認証を行うことができます。

env['warden'].authenticate!

env['warden'] は Warden::Proxy のインスタンスでしたね。では Warden::Proxy#authenticate! を見ていきましょう

# https://github.com/wardencommunity/warden/blob/master/lib/warden/proxy.rb#L132-L136
def authenticate!(*args)
  user, opts = _perform_authentication(*args)
  throw(:warden, opts) unless user
  user
end

_perform_authentication を実行して戻り値 user が存在しなければ throw(:warden, opts) で大域脱出。user が存在すれば そのまま user を返しています。

_perform_authentication を見ましょう

# https://github.com/wardencommunity/warden/blob/master/lib/warden/proxy.rb#L328-L343
def _perform_authentication(*args)
  scope, opts = _retrieve_scope_and_opts(args)
  user = nil

  # Look for an existing user in the session for this scope.
  # If there was no user in the session, see if we can get one from the request.
  return user, opts if user = user(opts.merge(:scope => scope))
  _run_strategies_for(scope, args)

  if winning_strategy && winning_strategy.successful?
    opts[:store] = opts.fetch(:store, winning_strategy.store?)
    set_user(winning_strategy.user, opts.merge!(:event => :authentication))
  end

  [@users[scope], opts]
end

_retrieve_scope_and_opts@config から scope と そのscope に対応する optionを取り出しているようです。 middleware 登録時に何も指定していない場合は scope:default それに対応するオプションは 空のhash になります。

続いて user メソッドを opts を引数に呼び出しています。 冒頭のようにmiddleware を登録した場合だと opts の中身は {scope: :default} になります。

user メソッドも割愛しますが、@users に該当 scope のキャッシュがいればそれを返し、なければ session の取り出しを試みます。複数 authenticate! が呼ばれてもパフォーマンスが劣化しないようにキャッシュしたり、一度認証が通ってsession が作成されればsessionが残っていれば再認証する必要がないのでこの処理を挟んでいるようです。

user メソッド を呼び出しても user が存在しなければ後続の _run_strategies_for(scope, args) に続きます。

# https://github.com/wardencommunity/warden/blob/master/lib/warden/proxy.rb#L353-L376
def _run_strategies_for(scope, args) #:nodoc:
  self.winning_strategy = @winning_strategies[scope]
  return if winning_strategy && winning_strategy.halted?

  # Do not run any strategy if locked
  return if @locked

  if args.empty?
    defaults   = @config[:default_strategies]
    strategies = defaults[scope] || defaults[:_all]
  end

  (strategies || args).each do |name|
    strategy = _fetch_strategy(name, scope)
    next unless strategy && !strategy.performed? && strategy.valid?
    catch(:warden) do
      _update_winning_strategy(strategy, scope)
    end

    strategy._run!
    _update_winning_strategy(strategy, scope)
    break if strategy.halted?
  end
end

winning_strategy は恐らく最後に呼び出したstrategyです。初回は nil にっているはずです。 args は env['warden'].authenticate! の引数でしたね。 今回の場合 authenticate! には何も渡してないので args は空です。 argsが空の場合、@config からstrategies を取り出します 「 rack middleware の登録」の箇所に書いた通り@configは以下のようになっているので、今回の場合 strategies は [:password, :basic] になります

{
  default_strategies: {
    _all: [:password]
  },
  failure_app: BadAuthenticationEndsUpHere
}

続いて取得した strategies をループ回して _fetch_strategy(name, scope) で strategyの実態の取り出し、 strategy._run! でstrategyの実行を行なっています。

# https://github.com/wardencommunity/warden/blob/master/lib/warden/proxy.rb#L379-L387
_fetch_strategy(name, scope)

def _fetch_strategy(name, scope)
  @strategies[scope][name] ||= if klass = Warden::Strategies[name]
    klass.new(@env, scope)
  elsif @config.silence_missing_strategies?
    nil
  else
    raise "Invalid strategy #{name}"
  end
end

冒頭で登録した strategy の実態を取り出し、そのインスタンスが返されています。 strategy の実態が定義されていない場合は 例外が返されます。 strategy を取得できたら strategy._run! を呼び出します。 strategy は Warden::Strategies::Base のサブクラスとして定義されているので Warden::Strategies::Base#_run! のを見ます。

# https://github.com/wardencommunity/warden/blob/master/lib/warden/strategies/base.rb#L52-L56
def _run! # :nodoc:
  @performed = true
  authenticate!
  self
end

@performed にして authenticate! を呼び出していますね。strategy の authenticate! メソッドは 冒頭で strategy の実態を登録する際に定義した認証処理でしたね。 authenticate! では任意の認証処理を実行して 失敗した場合は fail! 成功した場合は success!(user) を呼び出しています。

# https://github.com/wardencommunity/warden/blob/master/lib/warden/strategies/base.rb#L144-L147
def fail!(message = "Failed to Login")
  halt!
  @message = message
  @result = :failure
end

# https://github.com/wardencommunity/warden/blob/master/lib/warden/strategies/base.rb#L125-L130
def success!(user, message = nil)
  halt!
  @user = user
  @message = message
  @result = :success
end

fail! の場合は helt!@helted を true にして @result:failure を代入しています。 @helted が true の場合だと後続のstrategy が実行されないので、後続のstrategy もトライする場合は fail! ではなくfailで呼び出す必要があります。 success!helt!を実行し、 @user に認証によって取得することができた user を代入しています。

strategy._run! で認証処理を実行した後は_update_winning_strategy(strategy, scope) でwinning_strategyを直前に実行したstrategyに更新します。その後strategy.helted? を確認して true だと 認証が終了したとみなしループを抜けます。

_run_strategies_for の処理を見たので _perform_authentication に戻ります。

_run_strategies_for が終了すると winning_strategy.successful? で直前に実行された認証が成功しているか確認します。

成功していれば set_user(winning_strategy.user, opts.merge!(:event => :authentication)) を実行します。

# https://github.com/wardencommunity/warden/blob/master/lib/warden/proxy.rb#L170-L194
def set_user(user, opts = {})
  scope = (opts[:scope] ||= @config.default_scope)

  # Get the default options from the master configuration for the given scope
  opts = (@config[:scope_defaults][scope] || {}).merge(opts)
  opts[:event] ||= :set_user
  @users[scope] = user

  if opts[:store] != false && opts[:event] != :fetch
    options = env[ENV_SESSION_OPTIONS]
    if options
      if options.frozen?
        env[ENV_SESSION_OPTIONS] = options.merge(:renew => true).freeze
      else
        options[:renew] = true
      end
    end
    session_serializer.store(user, scope)
  end

  run_callbacks = opts.fetch(:run_callbacks, true)
  manager._run_callbacks(:after_set_user, user, self, opts) if run_callbacks

  @users[scope]
end

set_user では @usersに認証で取得できたuser のデータをキャッシュし、sessionに書き込んでいます。

set_user の処理が終わると [@users[scope], opts] を返して _perform_authentication の処理は終わりです。

_perform_authentication が終わったので authenticate! に戻ります。

だいぶ離れたのでコードをいかに再掲

def authenticate!(*args)
  user, opts = _perform_authentication(*args)
  throw(:warden, opts) unless user
  user
end

_perform_authentication は strategy で定義した認証が成功したら user は認証によって取得されたデータ。失敗したら nilが返ってきました。

認証が成功した場合は そのまま user を返して終わりです。 認証が失敗した場合は、認証が失敗した場合は throw(:warden, opts) で大域脱出をはかり、 Warden::Manager#call まで処理が戻ります。以下再掲

def call(env) # :nodoc:
  return @app.call(env) if env['warden'] && env['warden'].manager != self

  env['warden'] = Proxy.new(env, self)
  result = catch(:warden) do
    env['warden'].on_request
    @app.call(env)
  end

  result ||= {}
  case result
  when Array
    handle_chain_result(result.first, result, env)
  when Hash
    process_unauthenticated(env, result)
  when Rack::Response
    handle_chain_result(result.status, result, env)
  end
end

認証に成功した場合は result は@app.call(env) の戻り値になります。rack middleware の場合は基本的に Array になります。 認証に失敗した場合は throw(:warden, opts) の opts がresult に入ります。opts は Hash です。

認証が正常に終了した場合は result は Array のはずなので handle_chain_result を実行します。 handle_chain_result は基本的に result をそのまま返しています。

認証が失敗した場合は result は Hash なので process_unauthenticated(env, result) を実行します。

process_unauthenticated では 基本的に 冒頭で登録した failure_app の call メソッドを呼び出します。 failure_app をカスタマイズすることで、認証失敗時の処理を細かくカスタマイズすることができます。

終わりに

長くなったかつ最後の方はちょっと力尽きた感ありましたが、warden のコードを読む機会があったのでメモしておきます

社内でElasticsearch勉強会を開いた

同僚からのリクエストで社内Elasticsearch勉強会を開いた。

全文検索の基礎始まりElasticsearchの基礎的なことの話

直前まで資料作ってたので、発表しながらまとまりないな〜と思ったけど思いの外好評っぽくてよかった。

speakerdeck.com

sidekiq の scheduled jobの性能

sidekiq と scheduled job

ruby でよく使われるジョブキューにsidekiqというものがあります。 Rails などでは Active Job のバックエンドとして使うこともできます。

sidekiq は即時的に処理を行うだけではなく、Scheduled Jobという機能があり、例えば以下のようにすると1時間後にjobを実行することができます。

MyJob.perform_in(1.hour)

# active job
MyJob.set(wait: 1.hour).perform_later

また、即時で実行したい場合には以下のようにします

MyJob.perform_async

# active job 
MyJob.perform_later

sidekiq の queue の実装

sidekiqでは、queueにredisを使用しており、即時実行用の queue と scheduled job 用のqueueは異なった実装になっています。 それぞれについて、どのようにjobを溜めているのか、どのようにjobを取り出しているのかをみていきます。

即時実行

即時実行の場合、 redisのリストに

"queue:#{queue_name}"

というkeyでjobがpushされていきます。コードはこの辺です。

キューから取り出すときも単純で、単にリストからpopしているだけです。

scheduled job

scheduled job はソート済みセット型を使用しています。 n時間後に実行の場合、以下のようにtimeオブジェクトを浮動小数点数への変換を行い、変換した値をソート済みセット型のスコアとして使用しています。

# https://github.com/mperham/sidekiq/blob/2ed92600fa71a9c275189d01df369ad4f8b9ca32/lib/sidekiq/worker.rb#L55-L66

def perform_in(interval, *args)
  int = interval.to_f
    now = Time.now.to_f
    ts = (int < 1_000_000_000 ? now + int : int)

    payload = @opts.merge('class' => @klass, 'args' => args, 'at' => ts)
    # Optimization to enqueue something now that is scheduled to go out now or in the past
    payload.delete('at') if ts <= now
    @klass.client_push(payload)
  end
  alias_method :perform_at, :perform_in
end
# https://github.com/mperham/sidekiq/blob/2ed92600fa71a9c275189d01df369ad4f8b9ca32/lib/sidekiq/worker.rb#L136-L144
def client_push(item) # :nodoc:
  pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options['pool'] || Sidekiq.redis_pool
  # stringify
  item.keys.each do |key|
    item[key.to_s] = item.delete(key)
  end

  Sidekiq::Client.new(pool).push(item)
end
# https://github.com/mperham/sidekiq/blob/2ed92600fa71a9c275189d01df369ad4f8b9ca32/lib/sidekiq/client.rb#L69-L77
def push(item)
  normed = normalize_item(item)
  payload = process_single(item['class'], normed)

  if payload
    raw_push([payload])
    payload['jid']
  end
end
# https://github.com/mperham/sidekiq/blob/2ed92600fa71a9c275189d01df369ad4f8b9ca32/lib/sidekiq/client.rb#L181-L206
def raw_push(payloads)
  @redis_pool.with do |conn|
    conn.multi do
      atomic_push(conn, payloads)
    end
  end
  true
end

def atomic_push(conn, payloads)
  if payloads.first['at']
    conn.zadd('schedule', payloads.map do |hash|
      at = hash.delete('at').to_s
      [at, Sidekiq.dump_json(hash)]
    end)
  else
    q = payloads.first['queue']
    now = Time.now.to_f
    to_push = payloads.map do |entry|
      entry['enqueued_at'] = now
      Sidekiq.dump_json(entry)
    end
    conn.sadd('queues', q)
    conn.lpush("queue:#{q}", to_push)
  end
end

続いて、queueからjobを取り出す処理を見ていきます。

  • ソート時みセット型から、Time.now.to_f.to_s 以下のスコアのjobを取得
  • 取得したjobをソート済みセット型から削除を試みる
  • 削除が成功すると、取得したjobを即時実行のqueueにpushする

という処理を行なっています

# https://github.com/mperham/sidekiq/blob/2ed92600fa71a9c275189d01df369ad4f8b9ca32/lib/sidekiq/scheduled.rb#L11-L33

def enqueue_jobs(now=Time.now.to_f.to_s, sorted_sets=SETS)
  # A job's "score" in Redis is the time at which it should be processed.
  # Just check Redis for the set of jobs with a timestamp before now.
  Sidekiq.redis do |conn|
    sorted_sets.each do |sorted_set|
      # Get the next item in the queue if it's score (time to execute) is <= now.
      # We need to go through the list one at a time to reduce the risk of something
      # going wrong between the time jobs are popped from the scheduled queue and when
      # they are pushed onto a work queue and losing the jobs.
      while job = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do

        # Pop item off the queue and add it to the work queue. If the job can't be popped from
        # the queue, it's because another process already popped it so we can move on to the
        # next one.
        if conn.zrem(sorted_set, job)
          Sidekiq::Client.push(Sidekiq.load_json(job))
          Sidekiq::Logging.logger.debug { "enqueued #{sorted_set}: #{job}" }
        end
      end
    end
  end
end

queueの実装のまとめ

sidekiqでは、即時実行のqueueからjobをpopし、実行するやつはworker、scheduled job のqueue からjobを取得し即時実行のqueueにpushするやつはpooler と呼ばれておりそれぞれ別のスレッドで動いています。

ごちゃごちゃ書きましたが、文章だけだとわかりづらいので図にするとこんな感じです。

f:id:ogidow:20180625103131p:plain

scheduled job のパフォーマンス

ようやく本題です。 ある時点に多くの job を schedule すると、scheduled job の性能が悪くなります。 ここでいう性能が悪いというのは「n分後にscheduleしたが、実際に実行されるまでに時間がかかる」ということを意味しています。

処理量が増えるので、1つの poller あたりの処理量が頭打ちになり性能が悪くなるのは当たり前と思うかもしれません。 しかし、この問題は poller を増やしてもあまり改善しません。 「なぜpoller を増やしても改善しないのか、また即時実行の場合はworker を増やせば大量の処理を捌けるのか」ということを簡単な実験をしながら検証していきます。

即時実行の場合

上で説明した通り、即時実行の場合はredisのリスト型の queue にobが積まれており、worker が queue に積まれているjobを取り出し実行します。

簡易的に queue に 10万件の job が積まれていることを以下のように表現します。

require 'redis'
redis = Redis.new(host: '127.0.0.1', port: '6379')

100000.times do |i|
  redis.rpush('hoge', i)
end

そして、次のように 並列数(worker の数)を増やしつつ、job を 全て queue から取り出す速度を計測します。

require 'benchmark'
require 'redis'
require 'connection_pool'

number_of_worker = 5

Benchmark.realtime do
  Parallel.each((1..number_of_worker).to_a, in_processes: number_of_worker) do |i|
    redis_pool.with do |conn|
      while job = conn.rpop('hoge') do
      end
    end
  end
end

結果は以下のようになりました。

並列数 実行時間
1 21.21864500000038
3 19.56006800000023
5 15.638484000000062
7 13.080714000000171
10 11.504145999999764

並列数が増えるにしたがって、実行時間が短くなっていることがわかります。 このことから、即時実行でパフォーマンスに問題がある場合に、worker の数を増やすことは一定の効果があると考えられます。

scheduled job の場合

scheduled job の場合はソート済みセット型をqueueとして利用しているので、以下のように10万件の job が積まれていることを表現します。

require 'redis'
redis = Redis.new(host: '127.0.0.1', port: '6379')

100000.times do |i|
  redis.zadd('hoge', rand, i)
end

そして、同じように 並列数を増やしつつ、job を 全て queue から取り出す速度を計測します。

require 'benchmark'
require 'redis'
require 'connection_pool'

number_of_worker = 5

Benchmark.realtime do
  Parallel.each((1..number_of_worker).to_a, in_processes: number_of_worker) do |i|
    redis_pool.with do |conn|
      while job = conn.zrangebyscore('hoge', 0, 1, :limit => [0, 1]).first do
        conn.zrem('hoge', job)
      end
    end
  end
end

ポイントは即時実行の場合はqueueから job を取得する処理とqueueから削除する処理が同時にできるのに対し、 scheduled job では取得と削除を別々にやってます。

計測結果は以下のようになりました。

並列数 実行時間
1 46.90739100000064
3 82.63857199999984
5 125.35556799999995
7 111.53749299999981
10 150.0042739999999

並列数が増えるに連れて実行時間は長くなる傾向にあるようです。

scheduled job では並列で、queue から job の取得・削除を行うので以下のずのように競合が発生します。 競合が発生すると、片方の poller の処理が無駄になり効率が悪くなってしまいます。 並列数が増えると競合する確率が高くなり、処理の効率が落ち結果的に実行時間が長くなってしまうのかもしれません。

f:id:ogidow:20180626134443p:plain

実際にどのくらいの回数競合しているかを調べて見ます。

redis.del 'conflict'

Parallel.each((1..number_of_worker).to_a, in_processes: number_of_worker) do |i|
    redis_pool.with do |conn|
      while job = conn.zrangebyscore('hoge', 0, 1, :limit => [0, 1]).first do
        unless conn.zrem('hoge', job)
          conn.rpush('conflict', rand)
        end
      end
    end
  end

redis.llen 'conflict'
並列数 競合数
1 0
3 90490
5 164632
7 219304
10 308126

並列数が増えるに連れて、競合数も増えていることがわかります。 今回の実験では job数が10万件だったので、単純に10万回処理すれば良いはずですが、並列数10の場合は 308126回も処理が無駄になっています。

もちろん、実際は各poller が queue をポーリングするタイミングはもう少しばらけますが、poller が増えれば競合する確率も高くなるということには変わりありません。

同じ時間帯に大量の job を scheduling していて、実際に実行される時間までラグがあるという場合には、処理を1つのjobにできるだけまとめて、jobの数自体を減らすなどの工夫が必要かもしれません*1

*1:処理をまとめるかつ冪等なjobを書かないといけないという難しさはありますが...