各サブスクライバーがパブリッシャーから送信されたメッセージの処理を完了したことを確認するまで、パブリッシュ側でブロックしたいパブリッシュ-サブスクライブのユース ケースがあります。
私は (間違って?) ソリューションの一部として、RabbitMQ とその Java amqp-client の Channel.waitForConfirmsOrDie メソッドを使用できると想定していました。問題は、waitForConfirmsOrDie が実際にブロックするケースが見つからないことです。
javadocsによると、waitForConfirmsOrDie は次のようになっています。
最後の呼び出し以降に発行されたすべてのメッセージがブローカーによって確認または拒否されるまで待ちます。いずれかのメッセージが確認されなかった場合、waitForConfirmsOrDie は IOException をスローします。非確認チャネルで呼び出されると、すぐに戻ります。
この方法が実際に機能することをテストするために、RabbitMQ ウェブサイト のサンプル コードから始めました。
このコード例では、パブリッシャーとコンシューマーをそれぞれ別のスレッドで作成します。次に、コンシューマーがメッセージを消費している間、パブリッシャーは取引所にメッセージを送信します。パブリッシャーは、waitForConfirmsOrDie() への呼び出しを介してすべてのメッセージが確認されるまでブロックすることになっているようです。
このコード例は、私がやろうとしていたことと完全に一致しているように見えました。しかし、思ったようにうまくいかないようです。実際、コンシューマー スレッドで自動応答メッセージをオフにしても、waitForConfirmsOrDie() はすぐに戻ります。
ch.queueDeclare(QUEUE_NAME, false, false, false, null);
1 つのfalse
を true に変更するだけで自動確認をオフにしました
ch.queueDeclare(QUEUE_NAME, true, false, false, null);
。これは、消費者が ack を送信する必要がなくなったことを意味すると思います。
では、waitForConfirmsOrDie() は実際に何をするのでしょうか? いつブロックされますか?
waitForConfirmsOrDie が私が望むことをしない場合、続行する前にすべてのサブスクライバーがメッセージを承認するまでパブリッシャーを待機させる方法はありますか?