6

Spring DefaultMessageListenerContainer を、キューからメッセージを消費する ActiveMQ コンシューマーとして構成しました。これを「Test.Queue」と呼びましょう。このコードを 4 台の異なるマシンにデプロイし、すべてのマシンを同じ ActiveMQ インスタンスに構成して、同じ「Test.Queue」キューからのメッセージを処理します。

4 台のマシンすべてが起動して実行されるとすぐに、最大コンシューマー サイズを 20 に設定します。キューに対してカウントされるコンシューマーの数が 80 (4 * 最大コンシューマー サイズ = 80) であることがわかります。

生成されてキューに送信されるメッセージが高くなれば、すべて問題ありません。

1000 のメッセージがあり、80 のコンシューマーのうちの 1 つがスタックしているとします。これにより、Active MQ がフリーズし、他のコンシューマーへのメッセージの送信が停止します。

すべてのメッセージは ActiveMQ に永久にスタックされます。

最大 80 のコンシューマーを備えた 4 台のマシンがあるため、どのコンシューマーが確認できなかったのかはわかりません。

4台のマシンすべてを停止して再起動し、スタックした悪いコンシューマを持つマシンを停止すると、メッセージが再び流れ始めます。

DefaultMessageListenerContainer を構成して、不良なコンシューマーを破棄し、すぐに ActiveMQ にシグナルを送ってメッセージの送信を開始する方法がわかりません。

次のように、Spring がなくてもシナリオを作成できました。

  1. 最大 5000 のメッセージを生成し、「Test.Queue」キューに送信しました
  2. 2 つのコンシューマー (コンシューマー A、B) を作成し、1 つのコンシューマー B の onMessage() メソッドで、現在の時刻 % 13 が0 を指定すると、スレッドがスリープ状態になります。

  3. これらの 2 つのコンシューマを実行しました。

  4. Active MQ に移動し、キューに 2 つのコンシューマーがあることがわかりました。
  5. A と B の両方がメッセージを処理しています
  6. ある時点で、消費者 B の onMessage() が呼び出され、現在時刻 % 13 is 0 の条件が満たされるとスレッドがスリープ状態になります。
  7. コンシューマー B はスタックしており、ブローカーに確認できません
  8. Active MQ Web コンソールに戻りましたが、まだコンシューマーが 2 と表示されていますが、メッセージはキューから取り出されていません。
  9. ここで、別のコンシューマー C を作成し、それを実行して消費しました。
  10. ActiveMQ のコンシューマー数だけが 2 から 3 に増えました。
  11. しかし、コンシューマ C は何も消費していません。ブローカは、コンシューマ B の承認をまだ待っているため、それらすべてを保持するメッセージの送信に失敗したからです。
  12. また、消費者Aが何も消費していないことに気付きました
  13. 私はコンシューマー B を殺します。これで、すべてのメッセージが排出されます。

A、B、C が Spring の DefaultMessageListenerContainer によって管理されているとしましょう。Spring の DefaultMessageListenerContainer を微調整して、X 秒間確認できなかった後、その悪い消費者 (私の場合は消費者 B) をプールから取り除くにはどうすればよいですか?ブローカーがメッセージを永久に保持していないこと。

御時間ありがとうございます。

この問題の解決策が得られたら感謝します。

4

1 に答える 1

3

試してみるいくつかのオプションがあります...

  1. キューのプリフェッチを 0 に設定して、コンシューマー間での分散を促進し、特定のコンシューマーでの「スタック」メッセージを減らします。http://activemq.apache.org/what-is-the-prefetch-limit-for.htmlを参照してください

  2. 接続で「?useKeepAlive=false&wireFormat.maxInactivityDuration=20000」を設定して、指定された非アクティブ時間の後に低速コンシューマーをタイムアウトにします

  3. キュー ポリシー「slowConsumerStrategy->abortSlowConsumer」を設定します...再び遅いコンシューマーをタイムアウトにします

    <policyEntry ...
      ...
      <slowConsumerStrategy>
          <abortSlowConsumerStrategy />
      </slowConsumerStrategy>
      ...
    </policyEntry> 
    
于 2012-08-21T16:01:49.880 に答える