私は、毎秒多くのメッセージを生成する jms プロデューサーを持っています。メッセージは amq 永続キューに送信され、それらを順次処理する必要がある単一のコンシューマーによって消費されます。しかし、プロデューサーはコンシューマーよりもはるかに高速であるように思われ、パフォーマンスとメモリの問題が発生しています。メッセージは非常にゆっくりと取得され、消費は間隔を置いて発生するようです (消費者はポーリング方式でメッセージを「要求」しますが、これは奇妙です!)
基本的にすべてが春の統合で起こります。プロデューサー側の構成は次のとおりです。最初のステーク メッセージは、stakesInMemoryChannel で受信され、そこから、filteredStakesChannel をスローしてフィルター処理され、そこから jms キューに入ります (executor を使用して、送信が別のスレッドで行われるようにします)。
<bean id="stakesQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg name="name" value="${jms.stakes.queue.name}" />
</bean>
<int:channel id="stakesInMemoryChannel" />
<int:channel id="filteredStakesChannel" >
<int:dispatcher task-executor="taskExecutor"/>
</int:channel>
<bean id="stakeFilterService" class="cayetano.games.stake.StakeFilterService"/>
<int:filter
input-channel="stakesInMemoryChannel"
output-channel="filteredStakesChannel"
throw-exception-on-rejection="false"
expression="true"/>
<jms:outbound-channel-adapter channel="filteredStakesChannel" destination="stakesQueue" delivery-persistent="true" explicit-qos-enabled="true" />
<task:executor id="taskExecutor" pool-size="100" />
他のアプリケーションは、このようなメッセージを消費しています... メッセージは、jms ステークス キューからステークス入力チャネルに送られ、その後、2 つの別々のチャネルにルーティングされます。1 つはメッセージを保持し、もう 1 つは他の処理を行います。これを「処理」と呼びましょう。 .
<bean id="stakesQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg name="name" value="${jms.stakes.queue.name}" />
</bean>
<jms:message-driven-channel-adapter
channel="stakesInputChannel"
destination="stakesQueue"
acknowledge="auto"
concurrent-consumers="1"
max-concurrent-consumers="1"
/>
<int:publish-subscribe-channel id="stakesInputChannel" />
<int:channel id="persistStakesChannel" />
<int:channel id="processStakesChannel" />
<int:recipient-list-router
id="customRouter"
input-channel="stakesInputChannel"
timeout="3000"
ignore-send-failures="true"
apply-sequence="true"
>
<int:recipient channel="persistStakesChannel"/>
<int:recipient channel="processStakesChannel"/>
</int:recipient-list-router>
<bean id="prefetchPolicy" class="org.apache.activemq.ActiveMQPrefetchPolicy">
<property name="queuePrefetch" value="${jms.broker.prefetch.policy}" />
</bean>
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${jms.broker.url}" />
<property name="prefetchPolicy" ref="prefetchPolicy" />
<property name="optimizeAcknowledge" value="true" />
<property name="useAsyncSend" value="true" />
</bean>
</property>
<property name="sessionCacheSize" value="10"/>
<property name="cacheProducers" value="false"/>
</bean>