3

メッセージをリッスンするコンシューマーがあります。メッセージのフローがコンシューマーが処理できる以上の場合、このコンシューマーの別のインスタンスを開始したいと思います。

しかし、消費者からの情報をポーリングできるようにしたいので、ファンアウト交換を使用してすべてのプロデューサーがRPC呼び出しを取得できるように、RPCを使用してプロデューサーにこの情報を要求できると考えました。

私の質問は、まず第一にこれは可能であり、第二にそれは合理的ですか?

4

2 に答える 2

4

質問が「RPCメッセージを複数のサーバーに送信することは可能ですか?」である場合。答えはイエスです。

RPC呼び出しを作成するときは、メッセージに一時キューを添付します(通常はheader.reply_toにありますが、内部メッセージフィールドを使用することもできます)。これは、RPCターゲットが回答を公開するキューです。

RPCを単一のサーバーに送信すると、一時キューで複数のメッセージを受信できます。これは、RPC応答が次の方法で形成される可能性があることを意味します。

  • 単一のソースからの単一のメッセージ
  • 単一のソースからの複数のメッセージ
  • 複数のソースからの複数のメッセージ

このシナリオで発生する問題は次のとおりです。

  • いつ聞くのをやめますか?RPCサーバーの数がわかっている場合は、それぞれが応答を送信するまで待つことができます。それ以外の場合は、何らかの形式のタイムアウトを実装する必要があります。
  • 答えの出所を追跡する必要がありますか?この情報を保持するために、メッセージにいくつかの特別なフィールドを追加できます。メッセージの順序についても同じです。

それを行う方法を示すためのいくつかのコード(PythonとPikaライブラリ)。注意してください。これは完璧にはほど遠いです。最大の問題は、新しい回答が得られたときにタイムアウトをリセットする必要があることです。

    def consume_rpc(self, queue, result_len=1, callback=None, timeout=None, raise_timeout=False):
        if timeout is None:
            timeout = self.rpc_timeout

        result_list = []

        def _callback(channel, method, header, body):
            print "### Got 1/%s RPC result" %(result_len)
            msg = self.encoder.decode(body)
            result_dict = {}
            result_dict.update(msg['content']['data'])
            result_list.append(result_dict)

            if callback is not None:
                callback(msg)

            if len(result_list) == result_len:
                print "### All results are here: stopping RPC"
                channel.stop_consuming()

        def _outoftime():
            self.channel.stop_consuming()
            raise TimeoutError

        if timeout != -1:
            print "### Setting timeout %s seconds" %(timeout)
            self.conn_broker.add_timeout(timeout, _outoftime)

        self.channel.basic_consume(_callback, queue=queue, consumer_tag=queue)

        if raise_timeout is True:
            print "### Start consuming RPC with raise_timeout"
            self.channel.start_consuming()
        else:
            try:
                print "### Start consuming RPC without raise_timeout"
                self.channel.start_consuming()
            except TimeoutError:
                pass

        return result_list
于 2012-10-03T09:08:12.780 に答える
1

いくつかの調査の後、これは不可能のようです。RabbitMQ.comのチュートリアルを見ると、呼び出しのIDがあり、私が理解している限り、これは消費されていることがわかります。

私は別の方法を選択しました。それは、ログファイルを読み取り、データを集約することです。

于 2012-09-18T13:32:41.277 に答える