1

私は、毎秒多くのメッセージを生成する jms プロデューサーを持っています。メッセージは amq 永続キューに送信され、それらを順次処理する必要がある単一のコンシューマーによって消費されます。しかし、プロデューサーはコンシューマーよりもはるかに高速であるように思われ、パフォーマンスとメモリの問題が発生しています。メッセージは非常にゆっくりと取得され、消費は間隔を置いて発生するようです (消費者はポーリング方式でメッセージを「要求」しますが、これは奇妙です!)

基本的にすべてが春の統合で起こります。プロデューサー側の構成は次のとおりです。最初のステーク メッセージは、stakesInMemoryChannel で受信され、そこから、filteredStakesChannel をスローしてフィルター処理され、そこから jms キューに入ります (executor を使用して、送信が別のスレッドで行われるようにします)。

    <bean id="stakesQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg name="name" value="${jms.stakes.queue.name}" />
    </bean>

    <int:channel id="stakesInMemoryChannel" />

    <int:channel id="filteredStakesChannel" >
        <int:dispatcher task-executor="taskExecutor"/>
    </int:channel>

    <bean id="stakeFilterService" class="cayetano.games.stake.StakeFilterService"/>

    <int:filter 
       input-channel="stakesInMemoryChannel"
       output-channel="filteredStakesChannel" 
       throw-exception-on-rejection="false"
       expression="true"/>

    <jms:outbound-channel-adapter channel="filteredStakesChannel" destination="stakesQueue" delivery-persistent="true" explicit-qos-enabled="true"  />

    <task:executor id="taskExecutor" pool-size="100" />

他のアプリケーションは、このようなメッセージを消費しています... メッセージは、jms ステークス キューからステークス入力チャネルに送られ、その後、2 つの別々のチャネルにルーティングされます。1 つはメッセージを保持し、もう 1 つは他の処理を行います。これを「処理」と呼びましょう。 .

    <bean id="stakesQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg name="name" value="${jms.stakes.queue.name}" />
</bean>

<jms:message-driven-channel-adapter 
    channel="stakesInputChannel" 
    destination="stakesQueue" 
    acknowledge="auto"
    concurrent-consumers="1"
    max-concurrent-consumers="1"
    />
<int:publish-subscribe-channel id="stakesInputChannel" />
<int:channel id="persistStakesChannel" />
<int:channel id="processStakesChannel" />

<int:recipient-list-router 
        id="customRouter" 
        input-channel="stakesInputChannel" 
        timeout="3000" 
        ignore-send-failures="true" 
        apply-sequence="true"
        >
    <int:recipient channel="persistStakesChannel"/>
    <int:recipient channel="processStakesChannel"/>
</int:recipient-list-router> 


<bean id="prefetchPolicy" class="org.apache.activemq.ActiveMQPrefetchPolicy">
    <property name="queuePrefetch" value="${jms.broker.prefetch.policy}" />
</bean>

<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory">
        <bean class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="${jms.broker.url}" />     
            <property name="prefetchPolicy" ref="prefetchPolicy" /> 
            <property name="optimizeAcknowledge" value="true" /> 
            <property name="useAsyncSend" value="true" />
        </bean>
    </property>
    <property name="sessionCacheSize" value="10"/>
    <property name="cacheProducers" value="false"/>
</bean>
4

2 に答える 2

1

「間隔を置いて(消費者はポーリング方式でメッセージを「要求」しますが、これは奇妙です」という意味がわかりません。

コンテナー スレッドは、ポーリングしているように "見える" かもしれませんが、そうではありません。メッセージが到着するか、タイムアウトになるまで、AMQ クライアントでブロックされます。タイムアウトになると、すぐに AMQ receive() に戻ります。

この構成は問題ないようです。スレッドが 1 つの場合、消費率はルーターの下流で何をしているかに直接依存します。

于 2012-11-21T14:53:23.500 に答える
0

PooledConnectionFactoryを使用することをお勧めします。これは Spring JmsTemplate での使用が推奨され、Connection、Session、および MessageProducer インスタンスをプールして、使用されなくなった後に返すことができるようにします。

消費者側で見られる「間隔」の動作は、消費者のタイムアウトだと思います。

Gary Russel が言ったことに反して、amq.receive()効果的にキューをポーリングします。Spring 構成はこれを隠しますが、メッセージは基本的に、キューのコンシューマーで受信を呼び出すループでキューから引き出されます。receive()キューのコンシューマーは、メッセージの取得を試みるために呼び出すまで、メッセージがキューにあるかどうかを知る方法がありません。

これは、メッセージが着信したときにアクションを実行するリスナーを登録するトピックの場合とは逆です。トピックは、メッセージを処理するリスナーを登録するため、洗練されたソリューションです。

トピックを使用すると、activemq にメッセージの処理方法を指示できます。キューを使用すると、activemq は、要求されたときにメッセージを提供するだけです。

于 2012-11-23T04:57:09.030 に答える