RabbitMQ メッセージが ACK された後にコンシューマーに再配信されるのはなぜですか? 私はRabbitMQを初めて使用します。私の使い方が間違っているか、Ruby amqp gem に何か問題があるのかもしれません。
キューにサブスクライブして各メッセージを確認する ruby スクリプトがあります。メッセージを最後まで処理すると、メッセージは実際にキューから消えます。再配信されません。しかし、すべてのメッセージが ACK される前にスクリプトを中断し、スクリプトを再開すると、最初のメッセージから新たに配信が開始されます。
コードで見られる動作は、RabbitMQ Web 管理インターフェイスによって正確に反映されています。キューにはメッセージがあり、ACK があっても消えません。
手がかり: 約 5000 件のメッセージをキューに入れました。コンシューマーにかなりの量の ACK を許可すると、実際にはいくつかのメッセージがキューから削除されたように見えます (上で述べたこととは逆です)。私はこの現象を突き止めることができませんでした。
Ruby 1.9.3、RabbitMQ 2.8.7、および amqp ruby gem 0.9.8 を使用しています。これは、Ubuntu 12.0.4 または Mac OS 10.7.4 のプロデューサーとコンシューマーで発生します。
一体何??
(このメッセージの最後にある更新を参照してください)
コンシューマのコードは次のとおりです。
# encoding: utf-8
require "rubygems"
require 'amqp'
require 'aws-sdk'
queue_name = "some.queue"
begin
AMQP.start("amqp://localhost:5672") do | connection |
channel = AMQP::Channel.new(connection)
queue = channel.queue(queue_name, :durable => true)
queue.subscribe(:ack => true) do | metadata, payload |
metadata.ack
end
end
end
そして、ここにプロデューサーがあります:
# encoding: utf-8
require "rubygems"
require 'amqp'
require 'aws-sdk'
msg = ARGV[0]
queue_name = "some.queue"
begin
AMQP.start("amqp://localhost:5672") do | connection |
channel = AMQP::Channel.new(connection)
queue = channel.queue(queue_name, :durable => true)
(1..5000).each do | x |
channel.default_exchange.publish x, :routing_key => queue_name, :persistent => true
end
end
end
Wireshark を使用して、送信していた ack がブローカーに送信されていないことを確認しました。metadata.ack を呼び出しましたが、パケットは送信されませんでした。
@Robthewolf のアドバイスに基づいて、channel.prefetch(1) を試しました。その呼び出しを使用すると、すべての ack がブローカーに到達しました。一般に、channel.prefetch(n) を呼び出した場合、n 個 (場合によっては n+1 個) の ack を送信すると、それらがブローカーに送信されます。
そこで、新たな質問があります: prefetch() パラメーターは、最終的にブローカーに送信される前に送信する必要がある ack の数を決定するのはなぜですか?