私は、RabbitMQ を使用してキューにメッセージを送信するシステムを Ruby で開発しています。私は使っている:
- Ruby 1.9.1 安定版
- ウサギMQ 1.7.2
- AMQP gem v0.6.7 ( http://github.com/tmm1/amqp )
この gem で見たほとんどの例では、EM.add_periodic_timer ブロックにパブリッシュ呼び出しがあります。これは、大多数のユースケースであると思われるものでは機能せず、確かに私のものでは機能しません。いくつかの作業を完了するときにメッセージを発行する必要があるため、発行ステートメントを add_periodic_timer ブロックに入れるだけでは不十分です。
そのため、いくつかのメッセージをキューに発行し、それを「フラッシュ」して、発行したメッセージがサブスクライバーに配信されるようにする方法を見つけようとしています。
私が言いたいことを理解してもらうために、次の発行者コードを考えてみてください。
#!/usr/bin/ruby
require 'rubygems'
require 'mq'
MESSAGES = ["hello","goodbye","test"]
AMQP.start do
queue = MQ.queue('testq')
messages_published = 0
while (messages_published < 50)
if (rand() < 0.4)
message = MESSAGES[rand(MESSAGES.size)]
puts "#{Time.now.to_s}: Publishing: #{message}"
queue.publish(message)
messages_published += 1
end
sleep(0.1)
end
AMQP.stop do
EM.stop
end
end
したがって、このコードは単純にループし、ループの各反復で 40% の確率でメッセージを公開し、その後 0.1 秒間スリープします。50 件のメッセージが公開されるまでこれを行い、その後 AMQP を停止します。もちろん、これは概念実証にすぎません。
さて、私のサブスクライバーコード:
#!/usr/bin/ruby
require 'rubygems'
require 'mq'
AMQP.start do
queue = MQ.queue('testq')
queue.subscribe do |header, msg|
puts "#{Time.now.to_s}: Received #{msg}"
end
end
したがって、キューにサブスクライブするだけで、受信したメッセージごとに出力します。
パブリッシャーが AMQP.stop を呼び出したときに、サブスクライバーが 50 個のメッセージすべてを受信することを除けば、素晴らしいことです。
これが私の出版社からの出力です。簡潔にするために途中で切り捨てられています。
$ ruby publisher.rb
2010-04-14 21:45:42 -0400: Publishing: test
2010-04-14 21:45:42 -0400: Publishing: hello
2010-04-14 21:45:42 -0400: Publishing: test
2010-04-14 21:45:43 -0400: Publishing: test
2010-04-14 21:45:44 -0400: Publishing: test
2010-04-14 21:45:44 -0400: Publishing: goodbye
2010-04-14 21:45:45 -0400: Publishing: goodbye
2010-04-14 21:45:45 -0400: Publishing: test
2010-04-14 21:45:45 -0400: Publishing: test
.
.
.
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: goodbye
次に、サブスクライバーからの出力:
$ ruby consumer.rb
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received hello
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received goodbye
2010-04-14 21:45:56 -0400: Received goodbye
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
.
.
.
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received goodbye
出力のタイムスタンプに注意すると、サブスクライバーは、パブリッシャーが AMQP を停止して終了した後にのみ、すべてのメッセージを受信します。
では、AMQP の初心者として、メッセージをすぐに配信するにはどうすればよいでしょうか? パブリッシャーの while ループの本文に AMQP.start と AMQP.stop を入れてみましたが、最初のメッセージしか配信されませんでしたが、奇妙なことに、ログを有効にすると、サーバーからエラー メッセージが報告されず、メッセージはキューに送信されますが、サブスクライバーによって受信されることはありません。
提案をいただければ幸いです。読んでくれてありがとう。