'Low' と 'High' の 2 つのキューにワーカーをサブスクライブしていることを考えると、優先度の高いキューが空の場合、ワーカーは優先度の低いキューからのメッセージのみを処理したいと考えています。
ここで提案されているように、2 つのチャネルを定義し、優先度の高いキューでプリフェッチをより高い値に設定することで、これを実行しようとしています。-rabbitmq.html
これは私のワーカーコードです:
require "rubygems"
require "amqp"
EventMachine.run do
connection = AMQP.connect(:host => '127.0.0.1')
channel_low = AMQP::Channel.new(connection)
channel_high = AMQP::Channel.new(connection)
# Attempting to set the prefetch higher on the high priority queue
channel_low.prefetch(10)
channel_high.prefetch(20)
low_queue = channel_low.queue("low", :auto_delete => false)
high_queue = channel_high.queue("high", :auto_delete => false)
low_queue.subscribe do |payload|
puts "#{payload}"
slow_task
end
high_queue.subscribe do |payload|
puts "#{payload}"
slow_task
end
def slow_task
# Do some slow work
sleep(1)
end
end
このクライアントを実行すると、優先度の高いメッセージが最初に処理されません。
require "rubygems"
require "amqp"
EventMachine.run do
connection = AMQP.connect(:host => '127.0.0.1')
channel = AMQP::Channel.new(connection)
low_queue = channel.queue("low")
high_queue = channel.queue("high")
exchange = channel.direct("")
10.times do |i|
message = "LOW #{i}"
puts "sending: #{message}"
exchange.publish message, :routing_key => low_queue.name
end
# EventMachine.add_periodic_timer(0.0001) do
10.times do |i|
message = "HIGH #{i}"
puts "sending: #{message}"
exchange.publish message, :routing_key => high_queue.name
end
end
出力:
Client >>>
sending: LOW 0
sending: LOW 1
sending: LOW 2
sending: LOW 3
sending: LOW 4
sending: LOW 5
sending: LOW 6
sending: LOW 7
sending: LOW 8
sending: LOW 9
sending: HIGH 0
sending: HIGH 1
sending: HIGH 2
sending: HIGH 3
sending: HIGH 4
sending: HIGH 5
sending: HIGH 6
sending: HIGH 7
sending: HIGH 8
sending: HIGH 9
Server >>>
HIGH 0
HIGH 1
LOW 0
LOW 1
LOW 2
HIGH 2
LOW 3
LOW 4
LOW 5
LOW 6
LOW 7
HIGH 3
LOW 8
LOW 9
HIGH 4
HIGH 5
HIGH 6
HIGH 7
HIGH 8
HIGH 9