1

2 つの Java プロセスがあります。最初のプロセスはメッセージを生成し、ActiveMQ キューに入れます。2 番目のプロセス (コンシューマー) は、Spring Integration を使用してキューからメッセージを取得し、スレッドで処理します。

2 つの要件があります。

  1. コンシューマーには 3 つの処理スレッドが必要です。キューを介して 10 個のメッセージが入ってくる場合、最初の 3 つのメッセージを処理する 3 つのスレッドが必要で、残りの 7 つのメッセージはバッファリングする必要があります。

  2. 一部のメッセージがまだ処理されていない間にコンシューマーが停止した場合、再起動後にメッセージの処理を続行する必要があります。

これが私の設定です:

<bean id="messageActiveMqQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="example.queue" />
</bean>

<int-jms:message-driven-channel-adapter
    destination="messageActiveMqQueue" channel="incomingMessageChannel" />

<int:channel id="incomingMessageChannel">
    <int:dispatcher task-executor="incomingMessageChannelExecutor" />
</int:channel>

<bean id="incomingMessageChannelExecutor"
    class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="daemon" value="false" />
    <property name="maxPoolSize" value="3" />
</bean>

<int:service-activator input-channel="incomingMessageChannel"
    ref="myMessageProcessor" method="processMessage" />

最初の要件は期待どおりに機能します。10 個のメッセージを生成し、3 つの myMessageProcessors がそれぞれメッセージの処理を開始します。最初のメッセージが終了するとすぐに、4 番目のメッセージが処理されます。

ただし、すべてのメッセージが処理される前にコンシューマーを強制終了すると、それらのメッセージは失われます。再起動後、コンシューマーはこれらのメッセージを再び受け取りません。

上記の構成では、ThreadPoolTask​​Executor によって生成されたスレッドがメッセージをキューに入れるためだと思います。したがって、メッセージは既に incomingMessageChannel から削除されています。したがって、incomingMessageChannelExecutor のキュー容量を設定してみました。

<property name="queueCapacity" value="0" />

しかし、3 つ以上のメッセージがあるとエラー メッセージが表示されるようになりました。

2013-06-12 11:47:52,670 WARN [org.springframework.jms.listener.DefaultMessageListenerContainer] - Execution of JMS message listener failed, and no ErrorHandler has been set.
org.springframework.integration.MessageDeliveryException: failed to send Message to channel 'incomingMessageChannel'

も に変更しようとしmessage-driven-channel-adapterましたinbound-gatewayが、これにより同じエラーが発生します。

inbound-gatewayエラーが ActiveMQ キューに戻るように、 にエラー ハンドラを設定する必要がありますか? ThreadPoolTask​​Executor に空きスレッドがない場合にメッセージがキューに保持されるようにキューを構成するにはどうすればよいですか?

前もって感謝します、

ベネディクト

4

1 に答える 1

2

いいえ; エグゼキューター チャネルを使用する代わりに、<message-driven-channel-adapter/>.

<dispatcher/>をチャネルから取り外し、concurrent-consumers="3"アダプタにセットします。

于 2013-06-12T10:41:23.607 に答える