3

ActiveMQ5.6.0を実行しています。テスト環境では、静的ネットワークで3つのブローカーが動作しています。これが現在のシナリオです。6人の消費者が3つのブローカーにランダムに接続しています。1つのブローカーには3つのコンシューマーがあり、2つ目は2つ、3つ目は1つです。キューにメッセージを積み上げると、メッセージが1つのコンシューマーを持つ3つ目のブローカーにバックログされ、他の2つのブローカーには何も与えられないことがわかります。バックログの内、残りの5つのコンシューマーはアイドル状態です。

以下に、すべてのブローカー(dev.queue01)の構成を示します。他の2つは、静的ホスト名の適切な変更と同様です。

アイドル状態のコンシューマーが消費するために、メッセージが他のブローカーに自動的に配信されることを期待します。問題の説明で何か見落としがあったかどうか教えてください。ご指導ありがとうございます。

http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq- core.xsd ">

<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
    <property name="locations">
        <value>file:${activemq.conf}/credentials.properties</value>
    </property>
</bean>

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="prd.queue01" dataDirectory="${activemq.data}">

    <destinationPolicy>
        <policyMap>
          <policyEntries>
            <policyEntry topic=">" producerFlowControl="false" memoryLimit="1mb"> 
              <pendingSubscriberPolicy>
                <vmCursor />
              </pendingSubscriberPolicy>
            </policyEntry>
            <policyEntry queue=">" producerFlowControl="false" memoryLimit="64mb" optimizedDispatch="true" enableAudit="false" prioritizedMessages="true"> 
              <networkBridgeFilterFactory>
                <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true" />
              </networkBridgeFilterFactory>
            </policyEntry>
          </policyEntries>
        </policyMap>
    </destinationPolicy>

    <managementContext>
        <managementContext createConnector="true"/>
    </managementContext>

    <persistenceAdapter>
        <amqPersistenceAdapter directory="${activemq.data}/data/amqdb"/>
    </persistenceAdapter>

      <systemUsage>
        <systemUsage>
            <memoryUsage>
                <memoryUsage limit="256 mb"/>
            </memoryUsage>
            <storeUsage>
                <storeUsage limit="750 gb"/>
            </storeUsage>
            <tempUsage>
                <tempUsage limit="750 gb"/>
            </tempUsage>
        </systemUsage>
    </systemUsage>
    <transportConnectors>
        <transportConnector name="openwire" uri="tcp://0.0.0.0:61616" updateClusterClients="true" updateClusterClientsOnRemove="true" rebalanceClusterClients="true"/>
    </transportConnectors>

    <networkConnectors>
      <networkConnector uri="static:(tcp://dev.queue02:61616,tcp://dev.queue03:61616)" name="queues_only" conduitSubscriptions="false" decreaseNetworkConsumerPriority="false" networkTTL="4">
      <dynamicallyIncludedDestinations>
        <queue physicalName=">"/> 
      </dynamicallyIncludedDestinations>
      <excludedDestinations>
        <topic physicalName=">"/> 
      </excludedDestinations>
    </networkConnector>
</networkConnectors>


</broker>
<import resource="jetty.xml"/>

4

3 に答える 3

3

遅い答えですが、うまくいけば、それは将来の読者に役立つかもしれません。

ブローカーのネットワークリングについて説明しました。ここでは、B1、B2、およびB3がすべて相互に通信し、B1に3つのコンシューマー(C1〜C3)、B2に2つのコンシューマー(C4およびC5)、1つのコンシューマー(C6)があります。 B3で。メッセージがどこで生成されているか(どのブローカーに最初に行くか)については説明していませんが、B3だとしましょう。(B3は、説明に最も正確に一致する最悪のシナリオを生成しますが、メッセージが生成される場所に関係なく、負荷が不均一になります。)

B3には、C6、B1、およびB2の3つの接続されたコンシューマーがあります。そのブローカーはそれらのコンシューマー間でメッセージをラウンドロビンするため、メッセージの1/3はC6に、1/3はB1に、1/3はB2に送信されます。

B1には、C1、C2、C3、B2、およびB3の5つの接続されたコンシューマーがあります。ただし、メッセージは送信元と同じブローカーに配信されないため、B3からのメッセージをカウントするコンシューマーはC1、C2、C3、およびB2の4つです。したがって、メッセージ全体の1/3のうち、C1、C2、およびC3はそれぞれ1/4(合計の1/12)を取得し、B2は合計の同じ1/12を取得します。これについてはすぐに詳しく説明します。

B2には、C4、C5、B1、およびB3の4つのコンシューマーが接続されています。ただし、メッセージは送信元と同じブローカーに配信されないため、B3からのメッセージをカウントするコンシューマーはC4、C5、およびB1の3つです。したがって、合計メッセージの1/3のうち、C4とC5はそれぞれ1/3(合計の1/9)を取得し、B1は合計の同じ1/9を取得します。これについてもすぐに詳しく説明します。

これまでのところ、C6は合計メッセージの1/3を取得し、C1-C3は合計メッセージの1/12を取得し、C4-C5は合計メッセージの1/9を取得し、1/12 + 1/9 = 7 2番目のブローカーにルーティングされたメッセージ全体の/36。今、それらのメッセージに戻りましょう。

B3-> B1-> B2パス(全体の1/12)をたどったメッセージのうち、C4とC5の間でラウンドロビンされます(メッセージは元のブローカーB3に戻ることができないため)。それぞれの合計メッセージの1/24が追加されます。したがって、C4とC5は合計の1/9 + 1/24=11/72を受け取ります。

同様に、B3-> B2-> B1パス(全体の1/9)をたどったメッセージのうち、C1、C2、およびC3間でラウンドロビンされるため、C1、C2、およびC3は受信します。 1/12 +1/27=合計の13/108。

B3-> B1-> B2-> B3パス(全体の1/36)をたどったメッセージのうち、半分はC6(全体の1/72)に行き、半分はB1(1/72合計)。同様に、B3-> B2-> B1-> B3パス(全体の1/36)をたどったメッセージのうち、半分はC6(全体の1/72)に行き、半分はB2(1/合計72)。したがって、C6はメッセージの1/36(合計13/36)を取得し、B1は合計の1/72を取得し、B2は合計の1/72を取得します。

現在、収穫逓減が進んでいますが、C6がメッセージ全体の大きなシェア(36%)を獲得し、B1に接続している消費者(最も多くの消費者がいる)がそれぞれ小さなシェアを獲得していることがわかります。 (10%未満)、結果として、C6には多くの作業があり、C1-5にははるかに少ない作業があり、観察したようにアイドル状態になっています。また、一部のメッセージがネットワークを介して長いパスをたどり、待ち時間が長くなる可能性があることもわかりますが、それはあなたの質問の内容ではありません。

于 2014-08-01T21:54:30.990 に答える
1

私があなたを正しく理解していれば、ブローカーはここでキューを意味します。

  • すべてのブローカーは同じタイプのオブジェクトを持っています。
  • すべての消費者は同じ種類のプロセスを実行します。
  • そして、消費者間でワークロードを均等に共有したいと考えています。
  • 操作の順序はそれほど重要ではありません。

ActiveMQ5.5.1でも同じことをしようとしました。私がしたのは、1つのキューを作成し、複数のコンシューマーを作成することだけでした。私はすべての消費者を同じキューに向けました。

Active-MQが自動的に配布を処理しました。

私は次の例を観察しました:

キューがある場合-2000レコードあります。2つのコンシューマーを同時にこのキューにポイントすると、1番目のコンシューマーは0から始まるオブジェクトを処理します。2番目のコンシューマーはランダムなオフセット(たとえば700から)の後にオブジェクトの処理を開始します。

1番目のコンシューマーが0〜700のオブジェクトの処理を完了し、2番目のコンシューマーが200レコード(700〜900)を処理すると、1番目のコンシューマーは任意のランダムオフセット(1200の場合があります)からオブジェクトの取得を開始できます。

オフセットの調整は、ActiveMQによって自動的に制御されました。

私はこれを観察しました。私はこれが起こると確信しています。

私があなたの質問に答えたことを願っています(または少なくともあなたの問題を正しく理解しました)。

ここで私が理解していなかったのは、Active-MQがQUEUESを作成する場合、その間のどこかからオブジェクトをどのように提供したかということでした。

于 2013-01-04T18:39:14.387 に答える
1

よくわからないので、はるかにフェッチしますが、設定ではすべてのトピックが除外されています

<excludedDestinations>
    <topic physicalName=">"/> 
</excludedDestinations>

テストのためにその制限を取り除くことができますか?Activemqは、アドバイザリトピックを使用して、クライアントが特定のキュー/トピックに接続するときに通信します。したがって、アドバイザリトピックをブロックしたため、3番目のブローカーが他のクライアントについて知らない可能性があります。

于 2012-12-29T13:15:05.687 に答える