0

RabbitMQ メッセージが ACK された後にコンシューマーに再配信されるのはなぜですか? 私はRabbitMQを初めて使用します。私の使い方が間違っているか、Ruby amqp gem に何か問題があるのか​​もしれません。

キューにサブスクライブして各メッセージを確認する ruby​​ スクリプトがあります。メッセージを最後まで処理すると、メッセージは実際にキューから消えます。再配信されません。しかし、すべてのメッセージが ACK される前にスクリプトを中断し、スクリプトを再開すると、最初のメッセージから新たに配信が開始されます。

コードで見られる動作は、RabbitMQ Web 管理インターフェイスによって正確に反映されています。キューにはメッセージがあり、ACK があっても消えません。

手がかり: 約 5000 件のメッセージをキューに入れました。コンシューマーにかなりの量の ACK を許可すると、実際にはいくつかのメッセージがキューから削除されたように見えます (上で述べたこととは逆です)。私はこの現象を突き止めることができませんでした。

Ruby 1.9.3、RabbitMQ 2.8.7、および amqp ruby​​ gem 0.9.8 を使用しています。これは、Ubuntu 12.0.4 または Mac OS 10.7.4 のプロデューサーとコンシューマーで発生します。

一体何??

(このメッセージの最後にある更新を参照してください)

コンシューマのコードは次のとおりです。

# encoding: utf-8
require "rubygems"
require 'amqp'
require 'aws-sdk'

queue_name = "some.queue"
begin
  AMQP.start("amqp://localhost:5672") do | connection |
    channel  = AMQP::Channel.new(connection)
    queue = channel.queue(queue_name, :durable => true)
    queue.subscribe(:ack => true) do | metadata, payload |
      metadata.ack
    end
  end
end

そして、ここにプロデューサーがあります:

# encoding: utf-8
require "rubygems"
require 'amqp'
require 'aws-sdk'

msg = ARGV[0]
queue_name = "some.queue"
begin
  AMQP.start("amqp://localhost:5672") do | connection |
    channel  = AMQP::Channel.new(connection)
    queue    = channel.queue(queue_name, :durable => true)
    (1..5000).each do | x |
      channel.default_exchange.publish x, :routing_key => queue_name, :persistent => true
    end
  end
end

Wireshark を使用して、送信していた ack がブローカーに送信されていないことを確認しました。metadata.ack を呼び出しましたが、パケットは送信されませんでした。

@Robthewolf のアドバイスに基づいて、channel.prefetch(1) を試しました。その呼び出しを使用すると、すべての ack がブローカーに到達しました。一般に、channel.prefetch(n) を呼び出した場合、n 個 (場合によっては n+1 個) の ack を送信すると、それらがブローカーに送信されます。

そこで、新たな質問があります: prefetch() パラメーターは、最終的にブローカーに送信される前に送信する必要がある ack の数を決定するのはなぜですか?

4

1 に答える 1

1

コンシューマーが停止している場合、未確認のメッセージはすべてキューに返されます。少数のみが確認応答された場合、それらはキューに戻されませんが、残りは戻されます。を使用channel.basicQos(1);して、キューから一度に 1 つのアイテムのみを読み取るようにすることができます。最初のアイテムが ack されるまで、新しいアイテムは読み込まれません。

于 2012-11-07T05:52:24.083 に答える