トピック交換でパブリッシュ/サブスクライブ メッセージング パターンを実装する RabbitMQ を使用して小さなクラスを作成しました。この pub/sub の上に、メソッドとプロパティがあります。
void Send(Message, Subject) - サブスクライバーが処理できるように、宛先トピックにメッセージをパブリッシュします。
MessageReceivedEvent - このメッセージング インスタンスでメッセージ受信イベントをサブスクライブします (メッセージング インスタンスは、作成時に目的のサブスクライブ トピックにバインドされます)。
SendWaitReply(Message, Subject) - メッセージを送信し、送信されたメッセージ ID (またはタイムアウト) と一致する相関 ID を持つ応答メッセージが受信されるまでブロックします。これは基本的に、pub/sub パターンに基づくリクエスト/リプライまたは RPC メカニズムです。
私が選択したメッセージング パターンは、システムの設計方法により、多少決まっています。SendWaitReply の潜在的な問題を軽減するために返信先キューを使用できることはわかっていますが、それはいくつかの要件を破っています。
現在、私の問題は次のとおりです。
Listen イベントの場合、リスナーが単一のスレッドで実行されるため、メッセージはイベント サブスクライバーを介して同期的に処理されます。これにより、大量のメッセージを処理する際 (つまり、Web API からのイベントを消費するバックエンド プロセス内) に深刻なパフォーマンスの問題が発生します。イベントをサブスクライブしてから、タスクまたはスレッドプールを使用してコールバックのコレクションを並行してディスパッチするのではなく、コールバック関数を渡すことを検討しています。スレッドの安全性は明らかに呼び出し元の関心事です。これが正しいアプローチかどうかはわかりません。
SendWaitReply イベントについては、メッセージ リスナー ループからすべての受信メッセージを取得し、空でない相関 GUID が含まれている場合はそれらを ConcurrentDictionary に配置するハック ソリューションと思われるものを作成しました。次に、SendWaitReply メソッドで、ConcurrentDictionary をポーリングして、送信されたメッセージの Id と一致するキーを含むメッセージを探します (または一定期間後にタイムアウトします)。これを行うためのより高速でより良い方法がある場合は、本当に調査したいと思います。おそらく、現在ブロックされているすべての SendWaitReply メソッドに、新しいメッセージが利用可能であり、継続的にポーリングする代わりにすべての Id をチェックする必要があることを通知する方法でしょうか?
2014 年 10 月 15 日更新
多くの徹底的な調査の結果、RabbitMQ または AMQP の範囲でSendWaitReplyについて上記で提示した特定のユースケースを直接処理するための「公式の」メカニズム/ヘルパー/ライブラリは存在しないという結論に達しました。当分の間、現在のソリューションに固執します (より堅牢な実装を調査します)。提供された RPC 機能を使用することを推奨する回答がありましたが、残念ながら、これはリクエストごとに排他的なコールバック キューを使用する場合にのみ機能します。これは、同じトピック交換ですべてのメッセージ (要求と応答) を表示するという私の主要な要件の 1 つを破っています。
さらに明確にするために、SendWaitReply要求の典型的なメッセージ ペアは次の形式です。
- Topic_Exchange.Service_A => some_command => Topic_Exchange.Service_B
- Topic_Exchange.Service_B => some_command_reply => Topic_Exchange.Service_A
これにより、 Topic_Exchange.#にリスナーを設定するだけで、さまざまなサービスを介して非常に深い「コール スタック」をトレースするためのすべてのシステム トラフィックを確認できる、強力なデバッグおよびロギング手法が得られます。