4

AMQP gem doc で提供されている RabbitMQ RPC サンプル コードをいじり始め、同期リモート呼び出しを実行する非常に単純なコードを記述しようとしています。

require "amqp"

module RPC
  class Base
    include EM::Deferrable

    def rabbit(rabbit_callback)
      rabbit_loop = Proc.new {
        AMQP.connect do |connection|
          AMQP::Channel.new(connection) do |channel|
            channel.queue("rpc.queue", :exclusive => false, :durable => true) do |requests_queue|
              self.callback(&rabbit_callback)
              self.succeed(connection, channel, requests_queue)
            end # requests_queue
          end # AMQP.channel
        end # AMQP.connect

        Signal.trap("INT")  { connection.close { EM.stop } }
        Signal.trap("TERM") { connection.close { EM.stop } }
      }

      if !EM.reactor_running?
        EM.run do
          rabbit_loop.call
        end
      else
        rabbit_loop.call
      end
    end
  end

  class Server < Base

    def run
      server_loop = Proc.new do |connection, channel, requests_queue|
        consumer = AMQP::Consumer.new(channel, requests_queue).consume
        consumer.on_delivery do |metadata, payload|
          puts "[requests] Got a request #{metadata.message_id}. Sending a reply to #{metadata.reply_to}..."
          channel.default_exchange.publish(Time.now.to_s,
                                           :routing_key    => metadata.reply_to,
                                           :correlation_id => metadata.message_id,
                                           :mandatory      => true)
          metadata.ack
        end
      end
      rabbit(server_loop)
    end

  end

  class Client < Base

    def sync_push(request)
      result = nil
      sync_request = Proc.new do |connection, channel, requests_queue|
        message_id = Kernel.rand(10101010).to_s

        response_queue = channel.queue("", :exclusive => true, :auto_delete => true)
        response_queue.subscribe do |headers, payload|
          if headers.correlation_id == message_id
            result = payload
            connection.close { EM.stop }
          end
        end

        EM.add_timer(0.1) do 
          puts "[request] Sending a request...#{request} with id #{message_id}"
          channel.default_exchange.publish(request,
                                           :routing_key => requests_queue.name,
                                           :reply_to    => response_queue.name,
                                           :message_id  => message_id)
        end
      end

      rabbit(sync_request)
      result
    end
  end
end

アイデアは非常に単純です。メッセージ キューを常に準備しておく必要があります (これはrabbitメソッドによって処理されます)。クライアントが要求を送信する場合は常に、メッセージ ID とともに応答用の一時キューを作成することから始めます。次に、リクエストをメイン メッセージ キューにパブリッシュし、一時キューで同じメッセージ ID を持つレスポンスを待機して、この特定のリクエストに対する応答の準備が整ったことを確認します。一時キューとはどういうわけか冗長であると思いmessage_idます(キューも一意である必要があるため)。

このクライアント/サーバーコードを使用してダミースクリプトを実行します

# server session
>> server = RPC::Server.new
=> #<RPC::Server:0x007faaa23bb5b0>
>> server.run
Updating client properties
[requests] Got a request 3315740. Sending a reply to amq.gen-QCv8nP2dI5Qd6bg2Q1Xhk0...

# client session
>> client = RPC::Client.new
=> #<RPC::Client:0x007ffb6be6aed8>
>> client.sync_push "test 1"
Updating client properties
[request] Sending a request...test 1 with id 3315740
=> "2012-11-02 21:58:45 +0100"
>> client.sync_push "test 2"
AMQ::Client::ConnectionClosedError: Trying to send frame through a closed connection. Frame is #<AMQ::Protocol::MethodFrame:0x007ffb6b9c83d0 @payload="\x002\x00\n\x00\x00\x00\f\x00\x00\x00\x00", @channel=1>

私が本当に理解していない2つのポイントがあります:

  1. EventMachine に関連:コードで、メッセージを実際に公開したい場合Client、なぜ呼び出さなければならないのですか? EM.add_timerそして、なぜ使用EM.next_tickしてもうまくいかないのですか?ここで publish が呼び出されたとき、「すべて」は「準備ができている」はずです。
  2. AMQP に関連: 2 番目の要求の接続が閉じられたためにクライアントがクラッシュするのはなぜですか? 新しいリクエストがプッシュされるたびに、まったく新しい EM/AMQP ループが作成されるはずです。

残念ながら、オンラインで EM/AMQP を扱うコードはほとんどありません。この効率性に関するコメントも大歓迎です。

4

1 に答える 1

3

ドキュメントを掘り下げるとonce_declared、クライアントがキューの使用を開始したときにキューの準備が整っていることを確認するために、コールバックが実際に必要であることが最終的にわかりました。

接続の問題に関しては、どういうわけか、 を使用するEM::Deferrableと問題が発生するように思われるため、(非常に不十分な) 解決策は単純に を含めないことEM::Deferrableです。

require "amqp"

module RPC

  module Base

    def rabbit(rabbit_callback)
      rabbit_loop = Proc.new {
        AMQP.start do |connection|
          AMQP::Channel.new(connection) do |channel|
            channel.queue("rpc.queue", :exclusive => false, :durable => true) do |requests_queue|
              requests_queue.once_declared do
                rabbit_callback.call(connection, channel, requests_queue)
              end
            end
          end
        end

        Signal.trap("INT")  { AMQP.stop { EM.stop } }
        Signal.trap("TERM") { AMQP.stop { EM.stop } }
      }

      if !EM.reactor_running?
        @do_not_stop_reactor = false
        EM.run do
          rabbit_loop.call
        end
      else
        @do_not_stop_reactor = true
        rabbit_loop.call
      end
    end
  end

  class Server
    include Base

    def run
      server_loop = Proc.new do |connection, channel, requests_queue|
        consumer = AMQP::Consumer.new(channel, requests_queue).consume
        consumer.on_delivery do |metadata, payload|
          puts "[requests] Got a request #{metadata.message_id}. Sending a reply to #{metadata.reply_to}..."
          channel.default_exchange.publish(Time.now.to_s,
                                           :routing_key    => metadata.reply_to,
                                           :correlation_id => metadata.message_id,
                                           :mandatory      => true)
          metadata.ack
        end
      end
      rabbit(server_loop)
    end

  end

  class Client
    include Base

    def sync_push(request)
      result = nil
      sync_request = Proc.new do |connection, channel, requests_queue|
        message_id = Kernel.rand(10101010).to_s

        response_queue = channel.queue("", :exclusive => true, :auto_delete => true)
        response_queue.subscribe do |headers, payload|
          if headers.correlation_id == message_id
            result = payload
            AMQP.stop { EM.stop unless @do_not_stop_reactor }
          end
        end

        response_queue.once_declared do
          puts "[request] Sending a request...#{request} with id #{message_id}"
          channel.default_exchange.publish(request,
                                           :routing_key => requests_queue.name,
                                           :reply_to    => response_queue.name,
                                           :message_id  => message_id)
        end
      end

      rabbit(sync_request)
      result
    end
  end
end
于 2012-11-06T19:25:07.797 に答える