12

こんにちは、

私は2つの別々の、しかし関連するアプリを持っています。両方とも独自のバックグラウンドキューを持っている必要があります(個別のSidekiqおよびRedisプロセスを参照)app2ただし、ときどきからのキューにジョブをプッシュできるようにしたいと思いますapp1

単純なキュー/プッシュの観点から、app1既存のSidekiq/Redisスタックがない場合はこれを簡単に行うことができます。

# In a process, far far away

# Configure client 
Sidekiq.configure_client do |config|
  config.redis = { :url => 'redis://redis.example.com:7372/12', :namespace => 'mynamespace' }
end

# Push jobs without class definition 
Sidekiq::Client.push('class' => 'Example::Workers::Trace', 'args' => ['hello!'])

# Push jobs overriding default's 
Sidekiq::Client.push('queue' => 'example', 'retry' => 3, 'class' =>     'Example::Workers::Trace', 'args' => ['hello!'])

しかし、私がすでにaSidekiq.configure_clientSidekiq.configure_serverfromを呼んでいたことを考えるとapp1、おそらくここの間に何かが起こる必要があるステップがあります。

明らかに、Sidekiqの内部からシリアル化と正規化のコードを直接取得し、手動でapp2のredisキューにプッシュすることもできますが、それは脆弱な解決策のようです。機能を利用できるようにしたいと思いClient.pushます。

私の理想的な解決策は次のようなものだと思います。

SidekiqTWO.configure_client { remote connection..... } SidekiqTWO::Client.push(job....)

あるいは:

$redis_remote = remote_connection.....

Sidekiq::Client.push(job, $redis_remote)

明らかに少し面白味がありますが、それが私の理想的なユースケースです。

ありがとう!

4

3 に答える 3

10

つまり、FAQによると、「Sidekiqメッセージ形式は非常にシンプルで安定しています。JSON形式のハッシュにすぎません」ということです。私の強調-JSONをsidekiqに送信するのはあまりにも脆弱ではないと思います。特に、OPの状況のように、ジョブの送信先のRedisインスタンスをきめ細かく制御したい場合は、キューに入れられているジョブとともにRedisインスタンスを示すことができる小さなラッパーを作成するだけです。

ジョブをRedisインスタンスにラウンドロビンするKevinBedellのより一般的な状況では、使用するRedisインスタンスを制御したくないと思います。キューに入れて、ディストリビューションを自動的に管理するだけです。これまでにこれを要求したのは1人だけのようで、彼らは以下を使用するソリューションを考え出しましたRedis::Distributed

datastore_config = YAML.load(ERB.new(File.read(File.join(Rails.root, "config", "redis.yml"))).result)

datastore_config = datastore_config["defaults"].merge(datastore_config[::Rails.env])

if datastore_config[:host].is_a?(Array)
  if datastore_config[:host].length == 1
    datastore_config[:host] = datastore_config[:host].first
  else
    datastore_config = datastore_config[:host].map do |host|
      host_has_port = host =~ /:\d+\z/

      if host_has_port
        "redis://#{host}/#{datastore_config[:db] || 0}"
      else
        "redis://#{host}:#{datastore_config[:port] || 6379}/#{datastore_config[:db] || 0}"
      end
    end
  end
end

Sidekiq.configure_server do |config|
  config.redis = ::ConnectionPool.new(:size => Sidekiq.options[:concurrency] + 2, :timeout => 2) do
    redis = if datastore_config.is_a? Array
      Redis::Distributed.new(datastore_config)
    else
      Redis.new(datastore_config)
    end

    Redis::Namespace.new('resque', :redis => redis)
  end
end

高可用性とフェイルオーバーを実現するために考慮すべきもう1つのことは、信頼性機能を備えたSidekiq Proを入手することです。「SidekiqProクライアントは、一時的なRedisの停止に耐えることができます。エラーが発生すると、ローカルでジョブをキューに入れ、それらのジョブの配信を試みます。接続が復元されたら」とにかくsidekiqはバックグラウンドプロセス用であるため、Redisインスタンスがダウンした場合の短い遅延はアプリケーションに影響を与えません。2つのRedisインスタンスの1つがダウンし、ラウンドロビンを使用している場合、この機能を使用していない限り、まだいくつかのジョブが失われています。

于 2013-11-10T03:40:10.093 に答える
4

carols10centsが言うように、それは非常に単純ですが、私は常に機能をカプセル化し、他のプロジェクトで再利用できるようにするのが好きなので、HotelTonightのブログからアイデアを更新しました。この次のソリューションは、Rails4.1とSpringプリローダーに耐えられないHotelTonightを改善します。

現在、私は次のファイルを追加することでやり遂げていますlib/remote_sidekiq/

remote_sidekiq.rb

class RemoteSidekiq
  class_attribute :redis_pool
end

remote_sidekiq_worker.rb

require 'sidekiq'
require 'sidekiq/client'

module RemoteSidekiqWorker
  def client
    pool = RemoteSidekiq.redis_pool || Thread.current[:sidekiq_via_pool] || Sidekiq.redis_pool
    Sidekiq::Client.new(pool)
  end

  def push(worker_name, attrs = [], queue_name = "default")
    client.push('args' => attrs, 'class' => worker_name, 'queue' => queue_name)
  end
end

redis_poolを設定するイニシャライザーを作成する必要があります

config / initializers / remote_sidekiq.rb

url = ENV.fetch("REDISCLOUD_URL")
namespace = 'primary'

redis = Redis::Namespace.new(namespace, redis: Redis.new(url: url))

RemoteSidekiq.redis_pool = ConnectionPool.new(size: ENV['MAX_THREADS'] || 6) { redis }

Aleksによる編集:

行の代わりに、sidekiqの決してバージョンでは:

redis = Redis::Namespace.new(namespace, redis: Redis.new(url: url))

RemoteSidekiq.redis_pool = ConnectionPool.new(size: ENV['MAX_THREADS'] || 6) { redis }

行を使用する:

redis_remote_options = {
  namespace: "yournamespace",
  url: ENV.fetch("REDISCLOUD_URL")
}

RemoteSidekiq.redis_pool = Sidekiq::RedisConnection.create(redis_remote_options)

その後、include RemoteSidekiqWorker必要な場所にモジュールを配置できます。仕事は終わりました!

****より大きな環境の場合****

RemoteWorkerモデルを追加すると、次のような利点が追加されます。

  1. ターゲットのsidekiqワーカーにアクセスできるシステムを含め、どこでもRemoteWorkersを再利用できます。これは、発信者には透過的です。ターゲットsidekiqシステム内で「RemoteWorkers」フォームを使用するには、デフォルトでローカルSidekiqクライアントを使用するため、初期化子を使用しないでください。
  2. RemoteWorkersを使用すると、正しい引数が常に送信されるようになります(コード=ドキュメント)
  3. より複雑なSidekiqアーキテクチャを作成することによるスケールアップは、呼び出し元には透過的です。

これがRemoteWorkerの例です

class RemoteTraceWorker
  include RemoteSidekiqWorker
  include ActiveModel::Model

  attr_accessor :message

  validates :message, presence: true

  def perform_async
    if valid?
      push(worker_name, worker_args)
    else
      raise ActiveModel::StrictValidationFailed, errors.full_messages
    end
  end

  private

  def worker_name
    :TraceWorker.to_s
  end

  def worker_args
    [message]
  end
end
于 2015-04-27T08:49:50.793 に答える
1

を使用しているため、これに遭遇し、いくつかの問題が発生しましたActiveJob。これにより、メッセージがキューから読み取られる方法が複雑になります。

AROの回答に基づいて、redis_poolの設定が必要になります。

remote_sidekiq.rb

class RemoteSidekiq
  class_attribute :redis_pool
end

config / initializers / remote_sidekiq.rb

url = ENV.fetch("REDISCLOUD_URL")
namespace = 'primary'

redis = Redis::Namespace.new(namespace, redis: Redis.new(url: url))

RemoteSidekiq.redis_pool = ConnectionPool.new(size: ENV['MAX_THREADS'] || 6) { redis }

ここで、ワーカーの代わりに、リクエストをキューに入れるためのActiveJobアダプターを作成します。

lib / active_job / queue_adapters / remote_sidekiq_adapter.rb

require 'sidekiq'

module ActiveJob
  module QueueAdapters
    class RemoteSidekiqAdapter
      def enqueue(job)
        #Sidekiq::Client does not support symbols as keys
        job.provider_job_id = client.push \
          "class"   => ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper,
          "wrapped" => job.class.to_s,
          "queue"   => job.queue_name,
          "args"    => [ job.serialize ]
      end

      def enqueue_at(job, timestamp)
        job.provider_job_id = client.push \
          "class"   => ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper,
          "wrapped" => job.class.to_s,
          "queue"   => job.queue_name,
          "args"    => [ job.serialize ],
          "at"      => timestamp
      end

      def client
        @client ||= ::Sidekiq::Client.new(RemoteSidekiq.redis_pool)
      end
    end
  end
end

アダプターを使用して、イベントをキューに入れることができます。

require 'active_job/queue_adapters/remote_sidekiq_adapter'

class RemoteJob < ActiveJob::Base
  self.queue_adapter = :remote_sidekiq

  queue_as :default

  def perform(_event_name, _data)
    fail "
      This job should not run here; intended to hook into
      ActiveJob and run in another system
    "
  end
end

これで、通常のActiveJobAPIを使用してジョブをキューに入れることができます。アプリがこれをキューから読み取る場合はRemoteJob、アクションを実行するために一致するものを使用できる必要があります。

于 2016-10-16T19:54:15.357 に答える