0

Q1 と Q2 の 2 つのキューを持つ単一の activemq ブローカーがある状況があります。Activemessaging を使用する 2 つの Ruby ベースのコンシューマがあります。それらをC1とC2と呼びましょう。両方のコンシューマーが各キューをサブスクライブします。各キューにサブスクライブするときに activemq.prefetchSize=1 を設定しています。ack=client も設定しています。

次の一連のイベントを検討してください。

1) 長時間実行ジョブをトリガーするメッセージがキュー Q1 にパブリッシュされます。これを M1 と呼びます。

2) M1 がコンシューマー C1 にディスパッチされ、長い操作が開始されます。

3) 短いジョブをトリガーする 2 つのメッセージがキュー Q2 に発行されます。これらを M2 および M3 と呼びます。

4) M2 は C2 にディスパッチされ、C2 は短時間のジョブを迅速に実行します。

5) C1 がまだ M1 を実行しているにもかかわらず、M3 が C1 にディスパッチされます。prefetchSize=1 が接続ではなくキュー サブスクリプションに設定されているため、C1 にディスパッチできます。したがって、Q1 メッセージが既にディスパッチされているという事実は、1 つの Q2 メッセージのディスパッチを停止しません。

activemessaging コンシューマーはシングルスレッドであるため、最終的には、C1 が M1 の処理を​​完了するまで、M3 は長時間 C1 を待機します。そのため、コンシューマー C2 がアイドル状態であるにもかかわらず (メッセージ M2 ですぐに終了するため)、M3 は長時間処理されません。

基本的に、Q1 の長いジョブが実行され、Q2 の短いジョブが大量に作成されると、Q2 の短いジョブの 1 つだけがコンシューマーでスタックし、Q1 の長いジョブが完了するのを待機します。

サブスクリプション レベルではなく接続レベルで prefetchSize を設定する方法はありますか? M1 を処理している間、C1 にメッセージをディスパッチしたくありません。もう 1 つの方法は、Q1 の処理専用のコンシューマーを作成してから、Q2 の処理専用の別のコンシューマーを作成することです。しかし、Q1 のメッセージはめったにないので、私はそれをしたくありません。Q1 の専用のコンシューマーは、ほとんどの時間、メモリを拘束してアイドル状態になります。

4

2 に答える 2

2

activemq.prefetchSize は、拡張 Stomp ヘッダーに関する ActiveMQ ドキュメント ( http://activemq.apache.org/stomp.html ) によると、CONNECT ではなく、SUBSCRIBE メッセージでのみ使用できます。関連情報は次のとおりです。

動詞: 申し込む

ヘッダー: activemq.prefetchSize

タイプ: 整数

説明: クライアントにディスパッチされる保留メッセージの最大数を指定します。この最大数に達すると、クライアントがメッセージを確認するまでメッセージはディスパッチされません。メッセージの処理が遅くなる可能性があるコンシューマー間でメッセージを非常に公平に分散するには、1 に設定します。

これに関する私の読書と経験は、M1がACKされていないため(クライアントACKがオンになっているため)、このM1はサブスクリプションに設定されたprefetchSize = 1によって許可される1つのメッセージである必要があるということです。うまくいかなかったと聞いて驚いていますが、おそらくもっと詳細なテストを実行する必要があります。設定は、必要な動作に合わせて正しくする必要があります。

activemq のディスパッチについて他の人から不安定だと聞いたので、これは使用しているバージョンのバグである可能性があります。

1 つの提案として、ネットワーク トラフィックをスニッフィングして、M1 が何らかの理由で ack されているかどうかを確認するか、いくつかの puts ステートメントを ruby​​ stomp gem に投入して、通信を監視することをお勧めします (これは、私が通常、ストンプの問題のデバッグ)。

これを試す機会があれば、自分の結果でコメントを更新します。

1 つの提案: 複数の長い処理メッセージが送信される可能性が非常に高く、長い処理メッセージの数がプロセスの数を超えた場合は、この修正で迅速な処理メッセージが待機することになります。

私は、少なくとも 1 つの専用プロセスを高速なジョブを実行するか、別の言い方をすれば、より長いジョブを実行する一連のプロセスを専用にする傾向があります。すべてのポーラー コンシューマー プロセスが long と short の両方をリッスンすると、dispatch が何を行っても最適ではない結果になる可能性があります。プロセス グループは、送信先のサブセットをリッスンするようにコンシューマを構成する方法です: http://code.google.com/p/activemessaging/wiki/Configuration

processor_group 名、*list_of_processors

A processor group is a way to run the poller to only execute a subset of

ポーラーのコマンド ライン引数でグループの名前を渡すことにより、プロセッサを指定します。

You specify the name of the processor as its underscored lowercase

バージョン。したがって、プロセッサ グループに FooBarProcessor と BarFooProcessor がある場合、次のようになります。

    ActiveMessaging::Gateway.define do |s|
      ...
      s.processor_group :my_group, :foo_bar_processor, :bar_foo_processor
    end

The processor group is passed into the poller like the following:

    ./script/poller start -- process-group=my_group
于 2010-06-03T22:51:30.143 に答える
0

ActiveMessaging がこれをサポートしているかどうかはわかりませんが、長い処理メッセージが到着したときに他のコンシューマーのサブスクライブを解除し、処理後にそれらを再サブスクライブすることができます。

それはあなたに望ましい効果を与えるはずです。

于 2010-06-04T00:05:38.957 に答える