1

私には満たさなければならない興味深い状況があります。AMQP キュー内のメッセージを待機して待機する EventMachine ループが必要ですが、定期的に別の AMQP キューにメッセージを送信するためにそのループを中断します。私は EventMachine を初めて使用します。これは、EventMachine ループが必要なメッセージを送信しないことを除いて、これまでのところ私が持っているものです。

今、私は2つのプロシージャを作成しました:

    listen_loop = Proc.new {
        AMQP.start(connection_config) do |connection|
            AMQP::Channel.new(connection) do |channel|
                channel.queue("queue1", :exclusive => false, :durable => true) do |requests_queue|
                    requests_queue.once_declared do
                        consumer = AMQP::Consumer.new(channel, requests_queue).consume
                        consumer.on_delivery do |metadata, payload|
                            puts "[requests] Got a request #{metadata.message_id}. Sending a reply to #{metadata.reply_to}..."
                            response = "responding"
                            channel.default_exchange.publish(response,
                                :routing_key    => metadata.reply_to,
                                :correlation_id => metadata.message_id,
                                :mandatory      => true)
                            metadata.ack
                        end
                    end
                end
            end
        end
        Signal.trap("INT")  { AMQP.stop { EM.stop } }
        Signal.trap("TERM") { AMQP.stop { EM.stop } }
    }

    send_message = Proc.new {
        AMQP.start(connection_config) do |connection|
            channel = AMQP::Channel.new(connection)
            queue   = channel.queue('queue2')

            channel.default_exchange.publish("hello world", :routing_key => queue.name)
            EM.add_timer(0.5) do
                connection.close do
                    EM.stop{ exit }
                end
            end
        end
    }

そして、EventMachine ループがあります。

    EM.run do 
        EM.add_periodic_timer(5) { send_message.call }
        listen_loop.call
    end

リッスン ループでメッセージを受信できますが、定期的にメッセージを送信できません。

4

1 に答える 1

0

私が間違っていたことを理解しました。メッセージ ループは、既に接続されているため、RabbitMQ サーバーへの新しい接続を開くことができませんでした。すべてを単一の EventMachine ループに統合し、接続を再利用したところ、機能しました。

興味のある方は、次のようになります。

EM.run do

    AMQP.start(connection_config) do |connection|
        channel = AMQP::Channel.new(connection)

        EM.add_periodic_timer(5) { channel.default_exchange.publish("foo", :routing_key => 'queue2') }

        queue = channel.queue("queue1", :exclusive => false, :durable => true)
        channel.prefetch(1)
        queue.subscribe(:ack => true) do |metadata, payload|
            puts "[requests] Got a request #{metadata.message_id}. Sending a reply to #{metadata.reply_to}..."
            response = "bar"
            channel.default_exchange.publish(response,
                :routing_key    => metadata.reply_to,
                :correlation_id => metadata.message_id,
                :mandatory      => true)
            metadata.ack
        end
    end
    Signal.trap("INT")  { AMQP.stop { EM.stop } }
    Signal.trap("TERM") { AMQP.stop { EM.stop } }

end
于 2013-03-08T19:28:06.003 に答える