Spring JMS と Active MQ (5.6) でシステムを構築しています。これには、約 12 個の Spring Default Message Listener Containers (それぞれ最大 20 個の同時インスタンス) がすべて同じアクティブな mq 宛先 (キュー) に接続されています。
システムは、各ハンドラー (コンテナー) が自分自身にアドレス指定されたキューからメッセージを取得し、セレクターを使用して作業を行い、すべての作業が完了するまでメッセージをキューに戻すことによって機能します。
それぞれが 9 つの異なるハンドラーを通過する必要がある 25,000 のメッセージを送信するベンチマーク テストを行っています。
テストを実行するたびに、約 11300 のメッセージのみがすべてのハンドラーを通過しますが、アクティブな MQ はそれ以上メッセージを送信しません。
現在のテストの最後に、キューの次の統計を確認できます。 Enqueue Count: 120359 Dequeue Count: 106693 Dispatch Count: 106693 Inflight Count: 0 Queue Size: 13666
ブローカを再起動しない限り、アクティブな MQ はメッセージをディスパッチしません。
以下は私のactive-mq設定です:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd">
<bean id="propertyConfigurer" class="org.springframework.web.context.support.ServletContextPropertyPlaceholderConfigurer" />
<!-- The <broker> element is used to configure the ActiveMQ broker. -->
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="jmsDeployMqBroker" dataDirectory="${java.io.tmpdir}/activemq-data"
destroyApplicationContextOnStop="true" useJmx="true">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="false">
</policyEntry>
<policyEntry queue=">" producerFlowControl="false">
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<destinations>
<queue physicalName="handlersDest"/>
<topic physicalName="notificationsDest" />
<queue physicalName="ActiveMQ.DLQ" />
</destinations>
<!-- The managementContext is used to configure how ActiveMQ is exposed
in JMX. By default, ActiveMQ uses the MBean server that is started by the
JVM. For more information, see: http://activemq.apache.org/jmx.html -->
<managementContext>
<managementContext createConnector="false" />
</managementContext>
<!-- Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag). For more information,
see: http://activemq.apache.org/persistence.html -->
<persistenceAdapter>
<amq:kahaPersistenceAdapter directory="${java.io.tmpdir}/activemq-data/kahadb" maxDataFileLength="1g" />
</persistenceAdapter>
<!-- The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see: http://activemq.apache.org/configuring-transports.html -->
<transportConnectors>
<transportConnector name="openwire" uri="${org.apache.activemq.brokerURL}" />
</transportConnectors>
</broker>
</beans>
そして、これはハンドラーの私の春の設定のサンプルです:
<jee:jndi-lookup id="connectionFactory" jndi-name="${jndi.jms.connfactory}">
<jee:environment>
java.naming.factory.initial = ${jndi.jms.naming.factory.initial}
java.naming.provider.url = ${jndi.jms.naming.url}
</jee:environment>
</jee:jndi-lookup>
<!-- ID must not change as it is used in autowiring the handlers -->
<jee:jndi-lookup id="handlersDest" jndi-name="${jndi.docprod.queue}">
<jee:environment>
java.naming.factory.initial = ${jndi.jms.naming.factory.initial}
java.naming.provider.url = ${jndi.jms.naming.url}
${jndi.queue.setup}
</jee:environment>
</jee:jndi-lookup>
<!-- ID must not change as it is used in autowiring the handlers -->
<jee:jndi-lookup id="notificationsDest" jndi-name="${jndi.docprod.topic}">
<jee:environment>
java.naming.factory.initial = ${jndi.jms.naming.factory.initial}
java.naming.provider.url = ${jndi.jms.naming.url}
${jndi.topic.setup}
</jee:environment>
</jee:jndi-lookup>
<bean id="dmsReadContainer" class="mydomain.DocProdnMessageListenerContainer"
p:connectionFactory-ref="connectionFactory"
p:handlerClass="mydomain.DmsReadHandler"
p:messageListener-ref="dmsReadHandler"
p:destination-ref="handlersDest" >
<property name="concurrentConsumers"><value>${dmsRead.initialInstances}</value></property>
<property name="maxConcurrentConsumers"><value>${dmsRead.maxInstances}</value></property>
<property name="idleConsumerLimit"><value>${dmsRead.idleInstances}</value></property>
</bean>
<bean id="dmsReadHandler" class="mydomain.DmsReadHandler">
</bean>
...
ActiveMQ のログ ファイルには、ディスパッチを停止する理由を示唆するような異常は何も表示されません。
これ以上メッセージが送信されない理由を知っている人、または問題をさらに診断するための提案がある人はいますか?