2 つの Java プロセスがあります。最初のプロセスはメッセージを生成し、ActiveMQ キューに入れます。2 番目のプロセス (コンシューマー) は、Spring Integration を使用してキューからメッセージを取得し、スレッドで処理します。
2 つの要件があります。
コンシューマーには 3 つの処理スレッドが必要です。キューを介して 10 個のメッセージが入ってくる場合、最初の 3 つのメッセージを処理する 3 つのスレッドが必要で、残りの 7 つのメッセージはバッファリングする必要があります。
一部のメッセージがまだ処理されていない間にコンシューマーが停止した場合、再起動後にメッセージの処理を続行する必要があります。
これが私の設定です:
<bean id="messageActiveMqQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="example.queue" />
</bean>
<int-jms:message-driven-channel-adapter
destination="messageActiveMqQueue" channel="incomingMessageChannel" />
<int:channel id="incomingMessageChannel">
<int:dispatcher task-executor="incomingMessageChannelExecutor" />
</int:channel>
<bean id="incomingMessageChannelExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="daemon" value="false" />
<property name="maxPoolSize" value="3" />
</bean>
<int:service-activator input-channel="incomingMessageChannel"
ref="myMessageProcessor" method="processMessage" />
最初の要件は期待どおりに機能します。10 個のメッセージを生成し、3 つの myMessageProcessors がそれぞれメッセージの処理を開始します。最初のメッセージが終了するとすぐに、4 番目のメッセージが処理されます。
ただし、すべてのメッセージが処理される前にコンシューマーを強制終了すると、それらのメッセージは失われます。再起動後、コンシューマーはこれらのメッセージを再び受け取りません。
上記の構成では、ThreadPoolTaskExecutor によって生成されたスレッドがメッセージをキューに入れるためだと思います。したがって、メッセージは既に incomingMessageChannel から削除されています。したがって、incomingMessageChannelExecutor のキュー容量を設定してみました。
<property name="queueCapacity" value="0" />
しかし、3 つ以上のメッセージがあるとエラー メッセージが表示されるようになりました。
2013-06-12 11:47:52,670 WARN [org.springframework.jms.listener.DefaultMessageListenerContainer] - Execution of JMS message listener failed, and no ErrorHandler has been set.
org.springframework.integration.MessageDeliveryException: failed to send Message to channel 'incomingMessageChannel'
も に変更しようとしmessage-driven-channel-adapter
ましたinbound-gateway
が、これにより同じエラーが発生します。
inbound-gateway
エラーが ActiveMQ キューに戻るように、 にエラー ハンドラを設定する必要がありますか? ThreadPoolTaskExecutor に空きスレッドがない場合にメッセージがキューに保持されるようにキューを構成するにはどうすればよいですか?
前もって感謝します、
ベネディクト