2

'Low' と 'High' の 2 つのキューにワーカーをサブスクライブしていることを考えると、優先度の高いキューが空の場合、ワーカーは優先度の低いキューからのメッセージのみを処理したいと考えています。

ここで提案されているように、2 つのチャネルを定義し、優先度の高いキューでプリフェッチをより高い値に設定することで、これを実行しようとしています。-rabbitmq.html

これは私のワーカーコードです:

require "rubygems"
require "amqp"

EventMachine.run do
  connection = AMQP.connect(:host => '127.0.0.1')
  channel_low  = AMQP::Channel.new(connection)
  channel_high  = AMQP::Channel.new(connection)

  # Attempting to set the prefetch higher on the high priority queue
  channel_low.prefetch(10)
  channel_high.prefetch(20)

  low_queue    = channel_low.queue("low", :auto_delete => false)
  high_queue    = channel_high.queue("high", :auto_delete => false)

  low_queue.subscribe do |payload|
    puts "#{payload}"
    slow_task
  end

  high_queue.subscribe do |payload|
    puts "#{payload}"
    slow_task
  end

  def slow_task
    # Do some slow work
    sleep(1)
  end
end

このクライアントを実行すると、優先度の高いメッセージが最初に処理されません。

require "rubygems"
require "amqp"

EventMachine.run do
  connection = AMQP.connect(:host => '127.0.0.1')
  channel  = AMQP::Channel.new(connection)

  low_queue    = channel.queue("low")
  high_queue    = channel.queue("high")
  exchange = channel.direct("")

  10.times do |i| 
    message = "LOW #{i}"
    puts "sending: #{message}"
    exchange.publish message, :routing_key => low_queue.name
  end

  # EventMachine.add_periodic_timer(0.0001) do
  10.times do |i|
    message = "HIGH #{i}"
    puts "sending: #{message}"
    exchange.publish message, :routing_key => high_queue.name
  end

end

出力:

Client >>>
sending: LOW 0
sending: LOW 1
sending: LOW 2
sending: LOW 3
sending: LOW 4
sending: LOW 5
sending: LOW 6
sending: LOW 7
sending: LOW 8
sending: LOW 9
sending: HIGH 0
sending: HIGH 1
sending: HIGH 2
sending: HIGH 3
sending: HIGH 4
sending: HIGH 5
sending: HIGH 6
sending: HIGH 7
sending: HIGH 8
sending: HIGH 9

Server >>>

HIGH 0
HIGH 1
LOW 0
LOW 1
LOW 2
HIGH 2
LOW 3
LOW 4
LOW 5
LOW 6
LOW 7
HIGH 3
LOW 8
LOW 9
HIGH 4
HIGH 5
HIGH 6
HIGH 7
HIGH 8
HIGH 9
4

4 に答える 4

2

マイケルが言ったように、現在のアプローチにはいくつかの問題があります。

  • 明示的な ack を有効にしないということは、RabbitMQ がメッセージを処理したときではなく、送信したときに配信されたと見なすことを意味します。
  • メッセージは非常に小さいため、ネットワーク経由ですばやく配信できます
  • サブスクライブ ブロックは、EventMachine がネットワーク データを読み取るときに、ソケットから読み取ったデータのフル フレームごとに 1 回呼び出されます。
  • 最後に、reactor スレッドを (スリープ状態で) ブロックすると、EM が ack をソケットに送信できなくなり、適切な動作が得られなくなります。

優先度の概念を実装するには、ネットワーク データの受信と確認を分離する必要があります。私たちのアプリケーション (私がブログ記事で書いたもの) では、バックグラウンド スレッドとプライオリティ キューを使用して、入ってくる作業を並べ替えました。これにより、各ワーカーにメッセージの小さなバッファーが導入されます。これらのメッセージの一部は、処理する優先度の高いメッセージがなくなるまで処理されない優先度の低いメッセージである可能性があります。

以下は、ワーカー スレッドと優先度キューを使用して目的の結果を取得する、わずかに変更されたワーカー コードです。

require "rubygems"
require "amqp"
require "pqueue"

EventMachine.run do
  connection = AMQP.connect(:host => '127.0.0.1')
  channel_low  = AMQP::Channel.new(connection)
  channel_high  = AMQP::Channel.new(connection)

  # Attempting to set the prefetch higher on the high priority queue
  channel_low.prefetch(10)
  channel_high.prefetch(20)

  low_queue    = channel_low.queue("low", :auto_delete => false)
  high_queue    = channel_high.queue("high", :auto_delete => false)

  # Our priority queue for buffering messages in the worker's memory
  to_process = PQueue.new {|a,b| a[0] > b[0] }

  # The pqueue gem isn't thread safe
  mutex = Mutex.new

  # Background thread for working blocking operation. We can spin up more of
  # these to increase concurrency.
  Thread.new do
    loop do
      _, header, payload = mutex.synchronize { to_process.pop }

      if payload
        puts "#{payload}"
        slow_task
        # We need to call ack on the EM thread.
        EM.next_tick { header.ack }
      else
        sleep(0.1)
      end
    end
  end

  low_queue.subscribe(:ack => true) do |header, payload|
    mutex.synchronize { to_process << [0, header, payload] }
  end

  high_queue.subscribe(:ack => true) do |header, payload|
    mutex.synchronize { to_process << [10, header, payload] }
  end

  def slow_task
    # Do some slow work
    sleep(1)
  end
end

同時実行性を高める必要がある場合は、複数のバックグラウンド スレッドを生成できます。

于 2013-06-06T20:50:58.393 に答える
1

あなたのアプローチは、最も一般的に使用される回避策の 1 つです。ただし、投稿された特定の例にはいくつかの問題があります。

プリフェッチは、どのチャネルが配信の優先順位を持つかを制御しません。「進行中」(未確認) のメッセージの数を制御します。

これは貧弱な優先順位付け手法として使用できますが、自動承認モードを使用するため、プリフェッチは機能しません (RabbitMQ はメッセージを送信するとすぐに、メッセージが承認されたと見なします)。

少数のメッセージのみをパブリッシュし、サンプルの実行が終了した場合、順序はメッセージをパブリッシュする順序に大きく依存する可能性が高くなります。

手動確認応答でプリフェッチ設定の効果を確認するには、より長い時間実行する必要があります (これはメッセージ レートによって異なりますが、少なくとも 1 分間は.

于 2013-06-06T10:46:17.150 に答える