rabbitmq を使用して RPC を構築しようとしています。
rabbitmq http://www.rabbitmq.com/tutorials/tutorial-six-ruby.htmlを使用して RPC を構築するためのチュートリアルによると、クライアントごとに 1 つの応答キューを使用し、correlation_id を使用して応答と要求をマッピングできます。correlation_id の使用方法について混乱していますか?
これが私が実行している問題です。2 つの異なる相関 ID を持つ同じ応答キューを使用して、1 つのクライアントから 2 つの rpc 呼び出しを同期的に作成したいと考えています。ただし、チュートリアルで読んだことから、各クライアントが rpc 呼び出しを順番に行っていると想定しているように見えるため、これが正しいユースケースであるかどうかはわかりません。(この場合、なぜここで correlation_id が必要なのか、さらに混乱します)。
これは、私が達成しようとしているコード例です (rpc_server.rb はチュートリアルと同じです)。うまくいけば、私の質問がより明確になります。
thr1 に設定すると、correlation_id が thr2 によって上書きされるため、以下のコード ブロックは機能しません。
とにかくそれを修正して機能させることはあるのだろうか?@reply_queue.subscribe ブロックを初期化から外して別の call_id を渡そうとしても、thr1 が終了するのを待っている間に @reply-queue がロックされるように見えるため、まだ機能しません。
質問が不明な場合はお知らせください。ご回答いただきありがとうございます。
#!/usr/bin/env ruby
# encoding: utf-8
require "bunny"
require "thread"
conn = Bunny.new(:automatically_recover => false)
conn.start
ch = conn.create_channel
class FibonacciClient
attr_reader :reply_queue
attr_accessor :response, :call_id
attr_reader :lock, :condition
def initialize(ch, server_queue)
@ch = ch
@x = ch.default_exchange
@server_queue = server_queue
@reply_queue = ch.queue("", :exclusive => true)
@lock = Mutex.new
@condition = ConditionVariable.new
that = self
@reply_queue.subscribe do |delivery_info, properties, payload|
if properties[:correlation_id] == that.call_id
that.response = payload.to_i
that.lock.synchronize{that.condition.signal}
end
end
end
def call(n)
self.call_id = self.generate_uuid
@x.publish(n.to_s,
:routing_key => @server_queue,
:correlation_id => call_id,
:reply_to => @reply_queue.name)
lock.synchronize{condition.wait(lock)}
response
end
protected
def generate_uuid
# very naive but good enough for code
# examples
"#{rand}#{rand}#{rand}"
end
end
client = FibonacciClient.new(ch, "rpc_queue")
thr1 = Thread.new{
response1 = client.call(30)
puts response1
}
thr2 = Thread.new{
response2 = client.call(40)
puts response2
}
ch.close
conn.close