Spring DefaultMessageListenerContainer を、キューからメッセージを消費する ActiveMQ コンシューマーとして構成しました。これを「Test.Queue」と呼びましょう。このコードを 4 台の異なるマシンにデプロイし、すべてのマシンを同じ ActiveMQ インスタンスに構成して、同じ「Test.Queue」キューからのメッセージを処理します。
4 台のマシンすべてが起動して実行されるとすぐに、最大コンシューマー サイズを 20 に設定します。キューに対してカウントされるコンシューマーの数が 80 (4 * 最大コンシューマー サイズ = 80) であることがわかります。
生成されてキューに送信されるメッセージが高くなれば、すべて問題ありません。
1000 のメッセージがあり、80 のコンシューマーのうちの 1 つがスタックしているとします。これにより、Active MQ がフリーズし、他のコンシューマーへのメッセージの送信が停止します。
すべてのメッセージは ActiveMQ に永久にスタックされます。
最大 80 のコンシューマーを備えた 4 台のマシンがあるため、どのコンシューマーが確認できなかったのかはわかりません。
4台のマシンすべてを停止して再起動し、スタックした悪いコンシューマを持つマシンを停止すると、メッセージが再び流れ始めます。
DefaultMessageListenerContainer を構成して、不良なコンシューマーを破棄し、すぐに ActiveMQ にシグナルを送ってメッセージの送信を開始する方法がわかりません。
次のように、Spring がなくてもシナリオを作成できました。
- 最大 5000 のメッセージを生成し、「Test.Queue」キューに送信しました
2 つのコンシューマー (コンシューマー A、B) を作成し、1 つのコンシューマー B の onMessage() メソッドで、現在の時刻 % 13 が0 を指定すると、スレッドがスリープ状態になります。
これらの 2 つのコンシューマを実行しました。
- Active MQ に移動し、キューに 2 つのコンシューマーがあることがわかりました。
- A と B の両方がメッセージを処理しています
- ある時点で、消費者 B の onMessage() が呼び出され、現在時刻 % 13 is 0 の条件が満たされるとスレッドがスリープ状態になります。
- コンシューマー B はスタックしており、ブローカーに確認できません
- Active MQ Web コンソールに戻りましたが、まだコンシューマーが 2 と表示されていますが、メッセージはキューから取り出されていません。
- ここで、別のコンシューマー C を作成し、それを実行して消費しました。
- ActiveMQ のコンシューマー数だけが 2 から 3 に増えました。
- しかし、コンシューマ C は何も消費していません。ブローカは、コンシューマ B の承認をまだ待っているため、それらすべてを保持するメッセージの送信に失敗したからです。
- また、消費者Aが何も消費していないことに気付きました
- 私はコンシューマー B を殺します。これで、すべてのメッセージが排出されます。
A、B、C が Spring の DefaultMessageListenerContainer によって管理されているとしましょう。Spring の DefaultMessageListenerContainer を微調整して、X 秒間確認できなかった後、その悪い消費者 (私の場合は消費者 B) をプールから取り除くにはどうすればよいですか?ブローカーがメッセージを永久に保持していないこと。
御時間ありがとうございます。
この問題の解決策が得られたら感謝します。