3

私は JMS/ActiveMQ を初めて使用し、ActiveMQ のキューからメッセージを取得し、永続化のためにこれらのメッセージを処理する Spring/Hibernate アプリケーションを持っています。メッセージの処理と永続化には時間がかかるため、複数のメッセージを同時に処理できるように、DefaultMessageListenerContainer を複数のコンシューマー (例: 5 ~ 10) に設定しました。私は ActiveMQ と Spring API のドキュメントをたくさん見てきましたが、必要なのは maxConcurrentConsumers を 10 に設定し、concurrentConsumers を 5 に設定するか、DefaultMessageListenerContainer で同時実行数を 5 ~ 10 に設定するだけでよいと考えました。これを行うと、ActiveMQ の組み込みコンソールから、キューに実際に 5 つのコンシューマーがあることがわかります。しかし、キューに 10 個または 100 個のメッセージをドロップすると、処理はシングルスレッドのようで、ログ行を追加してスレッド ID を出力しました。すべての要求を順番に処理する同じスレッド ID のようです。コンソールの ActiveMQ の Queues ページから、[Browse Active Consumer] リンクをクリックして何が起こっているかを確認すると、1 つのコンシューマーには 100 件のメッセージすべてが保留中で、他の 4 つのメッセージには何もないように見えます。

私はいくつかの調査を行い、Spring からこの記事を見つけました ( http://forum.springsource.org/showthread.php?61170-Messages-missed-using-DefaultMessageListenerContainer)、すべてのコンシューマが 1000 メッセージにサインアップしていると考えて、値が 2 のプリフェッチ ポリシーを追加しました。メッセージの別のバッチを送信すると、1 つのコンシューマーに 2 ~ 3 個のメッセージが保留されますが、他の 4 つのコンシューマーはアイドル状態のままになり、すべてが最終的にその 1 つのコンシューマーによって順次処理されます。この時点で、おそらく ActiveMQ ブローカーの設定が間違っているのではないかと思いました。デフォルトのディスパッチ ポリシーはラウンド ロビン戦略であることをドキュメントで読みましたが、activemq.xml で constantPendingMessageLimitStrategy と呼ばれる設定が 1000 に設定されているのを見て、それを非常に低い数値 (例: 2) に設定しようとしました。ブローカーは一度に消費者に送信しましたが、それでも何もしませんでした. うまくいけば、誰かが私が間違っていることを指摘できます. 私の春の設定を以下に投稿しましたが、その1つの設定(constantPendingMessageLimitStrategy)を試す以外は、activemq.xmlにまったく触れていません。ActiveMQ 5.8 を使用しています。

<bean id="importRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
    <property name="initialRedeliveryDelay" value="15000" />
    <property name="maximumRedeliveries" value="-1" />
    <property name="useExponentialBackOff" value="true" />
    <property name="backOffMultiplier" value="2" />
</bean>

<bean id="importPrefetchPolicy" class="org.apache.activemq.ActiveMQPrefetchPolicy">
    <property name="all" value="2"></property>
</bean>

<bean id="importConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="${import.queue.url}"/>
    <property name="redeliveryPolicy" ref="importRedeliveryPolicy" />
    <property name="prefetchPolicy" ref="importPrefetchPolicy"></property>
</bean>

<bean id="importQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="${import.queue.name}" />
</bean>

<bean id="importListener" class="com.mycompany.ImportQueueListener" >
    <property name="importService" ref="importService"></property>
    <property name="sessionFactory" ref="sessionFactory"/>
</bean>

<bean id="importJmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
   <property name="connectionFactory" ref="importConnectionFactory" />
    <property name="destination" ref="importQueue" />
    <property name="messageListener" ref="importListener" />
    <property name="sessionTransacted" value="true" />
    <property name="maxConcurrentConsumers" value="10"></property>
    <property name="concurrentConsumers" value="5"></property>
</bean>
4

2 に答える 2

0
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms.xsd">




 <!--  <bean class="org.apache.activemq.command.ActiveMQQueue" id="destination">  
     <constructor-arg value="TEST.Q1"></constructor-arg>  
  </bean>-->

  <bean id="destination" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg value="TOPIC_NAME" />
  </bean>



 <bean class="org.springframework.jms.core.JmsTemplate" id="producerTemplate">
  <property name="connectionFactory" ref="connectionFactory"/>
   <property name="defaultDestination" ref="destination"/>
 </bean>  

 <!--ActiveMq broker URL configured here-->
   <bean class="org.apache.activemq.ActiveMQConnectionFactory" id="connectionFactory" >
        <property name="brokerURL">
          <value>tcp://localhost:61616</value>
        </property>
   </bean>  

    <!--producer configured here-->

   <bean class="Producer" id="simpleMessageProducer">  
       <property name="jmsTemplate" ref="producerTemplate"></property>  
   </bean> 


    <!--listeners configured here-->

     <bean class="Consumer" id="simpleMessageListener">  

      </bean>  
     <bean class="ConsumerSecond" id="simpleMessageListenerSecond">   </bean> 



     <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer" id="jmsContainer"> 
     <property name="connectionFactory" ref="connectionFactory"></property>  
     <property name="destination" ref="destination"></property>  
     <property name="messageListener" ref="simpleMessageListener"></property>  


    </bean> 
     <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer" id="jmsContainer1"> 
     <property name="connectionFactory" ref="connectionFactory"></property>  
     <property name="destination" ref="destination"></property>
     <property name="messageListener" ref="simpleMessageListenerSecond"></property>

    </bean>  





</beans>
于 2014-04-04T10:51:47.317 に答える
0

ActiveMQConnectionFactoryyourを poolに置き換えてみてください:

<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy method="stop">
<property name="connectionFactory">
  <bean class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL">
      <value>tcp://localhost:61616</value>
      </property>
   </bean>
  </property>
</bean>
<bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
   <property name="connectionFactory">
    <ref local="jmsFactory"/>
  </property>
</bean>
于 2013-04-16T13:58:52.530 に答える