メッセージをリッスンするコンシューマーがあります。メッセージのフローがコンシューマーが処理できる以上の場合、このコンシューマーの別のインスタンスを開始したいと思います。
しかし、消費者からの情報をポーリングできるようにしたいので、ファンアウト交換を使用してすべてのプロデューサーがRPC呼び出しを取得できるように、RPCを使用してプロデューサーにこの情報を要求できると考えました。
私の質問は、まず第一にこれは可能であり、第二にそれは合理的ですか?
質問が「RPCメッセージを複数のサーバーに送信することは可能ですか?」である場合。答えはイエスです。
RPC呼び出しを作成するときは、メッセージに一時キューを添付します(通常はheader.reply_toにありますが、内部メッセージフィールドを使用することもできます)。これは、RPCターゲットが回答を公開するキューです。
RPCを単一のサーバーに送信すると、一時キューで複数のメッセージを受信できます。これは、RPC応答が次の方法で形成される可能性があることを意味します。
このシナリオで発生する問題は次のとおりです。
それを行う方法を示すためのいくつかのコード(PythonとPikaライブラリ)。注意してください。これは完璧にはほど遠いです。最大の問題は、新しい回答が得られたときにタイムアウトをリセットする必要があることです。
def consume_rpc(self, queue, result_len=1, callback=None, timeout=None, raise_timeout=False):
if timeout is None:
timeout = self.rpc_timeout
result_list = []
def _callback(channel, method, header, body):
print "### Got 1/%s RPC result" %(result_len)
msg = self.encoder.decode(body)
result_dict = {}
result_dict.update(msg['content']['data'])
result_list.append(result_dict)
if callback is not None:
callback(msg)
if len(result_list) == result_len:
print "### All results are here: stopping RPC"
channel.stop_consuming()
def _outoftime():
self.channel.stop_consuming()
raise TimeoutError
if timeout != -1:
print "### Setting timeout %s seconds" %(timeout)
self.conn_broker.add_timeout(timeout, _outoftime)
self.channel.basic_consume(_callback, queue=queue, consumer_tag=queue)
if raise_timeout is True:
print "### Start consuming RPC with raise_timeout"
self.channel.start_consuming()
else:
try:
print "### Start consuming RPC without raise_timeout"
self.channel.start_consuming()
except TimeoutError:
pass
return result_list
いくつかの調査の後、これは不可能のようです。RabbitMQ.comのチュートリアルを見ると、呼び出しのIDがあり、私が理解している限り、これは消費されていることがわかります。
私は別の方法を選択しました。それは、ログファイルを読み取り、データを集約することです。