1

私は、RabbitMQ を使用してキューにメッセージを送信するシステムを Ruby で開発しています。私は使っている:

この gem で見たほとんどの例では、EM.add_periodic_timer ブロックにパブリッシュ呼び出しがあります。これは、大多数のユースケースであると思われるものでは機能せず、確かに私のものでは機能しません。いくつかの作業を完了するときにメッセージを発行する必要があるため、発行ステートメントを add_periodic_timer ブロックに入れるだけでは不十分です。

そのため、いくつかのメッセージをキューに発行し、それを「フラッシュ」して、発行したメッセージがサブスクライバーに配信されるようにする方法を見つけようとしています。

私が言いたいことを理解してもらうために、次の発行者コードを考えてみてください。

#!/usr/bin/ruby

require 'rubygems'
require 'mq'

MESSAGES = ["hello","goodbye","test"]

AMQP.start do

  queue = MQ.queue('testq')
  messages_published = 0

  while (messages_published < 50)

    if (rand() < 0.4)
      message = MESSAGES[rand(MESSAGES.size)]
      puts "#{Time.now.to_s}: Publishing: #{message}"
      queue.publish(message)
      messages_published += 1
    end

    sleep(0.1)

  end

  AMQP.stop do
    EM.stop
  end

end

したがって、このコードは単純にループし、ループの各反復で 40% の確率でメッセージを公開し、その後 0.1 秒間スリープします。50 件のメッセージが公開されるまでこれを行い、その後 AMQP を停止します。もちろん、これは概念実証にすぎません。

さて、私のサブスクライバーコード:

#!/usr/bin/ruby

require 'rubygems'
require 'mq'

AMQP.start do

  queue = MQ.queue('testq') 
  queue.subscribe do |header, msg|
    puts "#{Time.now.to_s}: Received #{msg}"
  end

end

したがって、キューにサブスクライブするだけで、受信したメッセージごとに出力します。

パブリッシャーが AMQP.stop を呼び出したときに、サブスクライバーが 50 個のメッセージすべてを受信することを除けば、素晴らしいことです。

これが私の出版社からの出力です。簡潔にするために途中で切り捨てられています。

$ ruby publisher.rb 
2010-04-14 21:45:42 -0400: Publishing: test
2010-04-14 21:45:42 -0400: Publishing: hello
2010-04-14 21:45:42 -0400: Publishing: test
2010-04-14 21:45:43 -0400: Publishing: test
2010-04-14 21:45:44 -0400: Publishing: test
2010-04-14 21:45:44 -0400: Publishing: goodbye
2010-04-14 21:45:45 -0400: Publishing: goodbye
2010-04-14 21:45:45 -0400: Publishing: test
2010-04-14 21:45:45 -0400: Publishing: test
.
.
.
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: goodbye

次に、サブスクライバーからの出力:

$ ruby consumer.rb
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received hello
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received goodbye
2010-04-14 21:45:56 -0400: Received goodbye
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
.
.
.
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received goodbye

出力のタイムスタンプに注意すると、サブスクライバーは、パブリッシャーが AMQP を停止して終了した後にのみ、すべてのメッセージを受信します。

では、AMQP の初心者として、メッセージをすぐに配信するにはどうすればよいでしょうか? パブリッシャーの while ループの本文に AMQP.start と AMQP.stop を入れてみましたが、最初のメッセージしか配信されませんでしたが、奇妙なことに、ログを有効にすると、サーバーからエラー メッセージが報告されず、メッセージはキューに送信されますが、サブスクライバーによって受信されることはありません。

提案をいただければ幸いです。読んでくれてありがとう。

4

1 に答える 1

2

この問題に関する情報が必要な場合は、 http: //groups.google.com/group/ruby-amqp/browse_thread/thread/311965bcf9697eceを参照してください

パブリッシャー コード内に追加のスレッドを追加して、問題を修正しました。

!/usr/bin/ruby

require 'rubygems'
require 'mq'

MESSAGES = ["hello","goodbye","test"]

AMQP.start do

  Thread.new do
      queue = MQ.queue('testq')
      messages_published = 0

      while (messages_published < 50)

        if (rand() < 0.4)
          message = MESSAGES[rand(MESSAGES.size)]
          puts "#{Time.now.to_s}: Publishing: #{message}"
          queue.publish(message)
          messages_published += 1
        end

        sleep(0.1)

      end

      AMQP.stop do
        EM.stop
      end

  end

end
于 2010-04-15T02:04:13.210 に答える