0

EasyNetQ を使用して、RabbitMQ から raw バイト メッセージを同期的に消費する方法はありますか?

EasyNetQ 形式で公開されていないシステムからのメッセージの順序どおりの処理と確認応答を保証する必要があります。コンシューマーが単一のスレッドで実行されることはわかっていますが、IAdvancedBusインターフェースは生のメッセージを消費するためのメソッドを 1 つしか提供していません。

IDisposable Consume(IQueue queue, Func<byte[], MessageProperties, MessageReceivedInfo, Task> onMessage);

戻り値のTask型は、コンシューマーがコールバックを非同期で実行しているため、メッセージを順不同で処理する可能性があることを意味します。

そうでない場合、これをサポートするためにコードを変更するためのアイデアはありますか? インターフェイスメソッドを作成します:

IDisposable Consume(IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage);

で実装しRabbitAdvancedBusますが、コードが正確にどこに行くのかわかりません。

4

2 に答える 2

0

これは興味深い質問です。私自身は EasyNetQ の専門家ではありません。おそらく他の誰かが来て、より良い回答を提供してくれるでしょう。しかし、私はEasyNetQ コード ベースに約 1 年間精通しており、私の意見では、コンシューマを配線するときに (したがってコンシューマが呼び出されるときに) 何が起こっているのかを理解するのは難しいと思います。

メソッドのシグネチャを変更しただけでは、メッセージが順番どおりに処理されるとは限りません。たとえば、提案されたインターフェースのこの実装を見てください。

IDisposable Consume(IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage)
{
    Func<byte[], MessageProperties, MessageReceivedInfo, Task> taskWrapper = (bytes, properties, info) =>
    {
        onMessage(bytes, properties, info);
        return new Task(() => { });
    };
    Consume(queue, taskWrapper);
}

元のメソッドを呼び出しますが、Consumeその後どうなるかはわかりませんよね?

私があなたの立場であれば、次のいずれかを行います。

  1. 公式の RabbitMq クライアントを使用して、そこからメッセージを消費します (それほどトリッキーではありません!)
  2. たぶん、私が貢献してきたRabbitMqの上の薄いレイヤーであるRawRabbitを見てください(vNext標準を使用)メッセージを消費するための非同期署名のみをサポートしますが、( AsyncExSubscriber.csのような同期ライブラリを使用して)同期実装を作成することは難しくありません。
  3. ビジネス ロジックのモデリングを変更します。これがあなたのケースに当てはまるかどうかはわかりませんが、一般に、すべてのメッセージが正しい順序で処理されることがミッションクリティカルである場合は、consume メソッドがこのメッセージが次のものであることを確認できるようにモデル化する必要があります列をなして。(さらに、EasyNetQ はメッセージ シーケンスを保証するとは思わないので、フレームワークの新しいバージョンごとに検証することをお勧めします)。

お役に立てれば!

于 2015-11-18T07:59:21.553 に答える