以下は、現在の実装のようなものだと思います。
class Worker
def initialize queue
@queue = queue
dequeue
end
def dequeue
@queue.pop do |item|
begin
work_on item
ensure
dequeue
end
end
end
def work_on item
case item.type
when :reload_credentials
# magic happens here
else
# more magic happens here
end
end
end
q = EM::Queue.new
workers = Array.new(10) { Worker.new q }
上記の問題は、私の理解が正しければ、worker が reload_credentials ジョブよりも新しいジョブ (プロデューサー タイムラインで以前に到着したジョブ)に取り組んでほしくないということです。以下はこれを提供する必要があります(最後に追加の注意事項)。
class Worker
def initialize queue
@queue = queue
dequeue
end
def dequeue
@queue.pop do |item|
begin
work_on item
ensure
dequeue
end
end
end
def work_on item
case item.type
when :reload_credentials
# magic happens here
else
# more magic happens here
end
end
end
class LockingDispatcher
def initialize channel, queue
@channel = channel
@queue = queue
@backlog = []
@channel.subscribe method(:dispatch_with_locking)
@locked = false
end
def dispatch_with_locking item
if locked?
@backlog << item
else
# You probably want to move the specialization here out into a method or
# block that's passed into the constructor, to make the lockingdispatcher
# more of a generic processor
case item.type
when :reload_credentials
lock
deferrable = CredentialReloader.new(item).start
deferrable.callback { unlock }
deferrable.errback { unlock }
else
dispatch_without_locking item
end
end
end
def dispatch_without_locking item
@queue << item
end
def locked?
@locked
end
def lock
@locked = true
end
def unlock
@locked = false
bl = @backlog.dup
@backlog.clear
bl.each { |item| dispatch_with_locking item }
end
end
channel = EM::Channel.new
queue = EM::Queue.new
dispatcher = LockingDispatcher.new channel, queue
workers = Array.new(10) { Worker.new queue }
したがって、最初のシステムへの入力は で入りますq
が、この新しいシステムでは で入りchannel
ます。はqueue
引き続きワーカー間の作業分散に使用されますが、queue
資格情報の更新操作が行われている間はデータが取り込まれません。残念ながら、私はそれ以上の時間を取らなかったのでLockingDispatcher
、商品の種類や発送のコードと結び付けられていないなどの一般化はしていませんCredentialsReloader
。お任せします。
ここで、これはあなたの最初のリクエストについて私が理解しているサービスですが、一般的にはこの種の要件を緩和する方がよいことに注意してください. その要件を変更しないと本質的に根絶できない未解決の問題がいくつかあります。
- システムは、資格情報ジョブを開始する前にジョブの実行が完了するのを待たない
- システムは資格情報ジョブのバーストを非常にうまく処理しません。処理可能な他のアイテムは処理できません。
- 資格情報コードにバグがある場合、バックログが RAM をいっぱいにして障害を引き起こす可能性があります。コードが中止可能であり、その後のメッセージがそれ以上のデッドロックを回避するのに十分に処理可能である場合、単純なタイムアウトで壊滅的な影響を回避するのに十分な場合があります。
実際には、システムにユーザーIDの概念があるようです。要件を熟考すると、資格情報の更新状態にあるユーザー ID に関連する項目のみをバックログする必要がある可能性があります。これは別の問題であり、別の種類のディスパッチが関係しています。それらのユーザーのロックされたバックログのハッシュを試してください。認証情報の完了時にコールバックを使用して、それらのバックログをワーカーに排出するか、同様の取り決めを行います。
幸運を!