2

同じ「メイン」スレッドで DefaultMessageListenerContainer を実行したい場合があります。現在、メッセージを受信するたびに新しいスレッドを生成する SimpleAsyncTaskExecutor を使用しています。

異なる分散システムに接続して処理を行うテスト ケースがあり、最終的にいくつかのことをアサートします。DefaultMessageListenerContainer は別のスレッドで実行されるため、メイン スレッドは返され、DefaultMessageListenerContainer が完了する前にアサーションの実行を開始します。これは、テスト ケースの失敗につながります。回避策として、メイン スレッドを数秒間スリープさせました。

サンプル構成

  <int-jms:message-driven-channel-adapter
        id="mq.txbus.publisher.channel.adapter"
        container="defaultMessageListenerContainer"
        channel="inbound.endpoint.publisher"
        acknowledge="transacted"
        extract-payload="true" />

  <beans:bean id="defaultMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <beans:property name="connectionFactory" ref="mockConnectionFactory"/>
    <beans:property name="destination" ref="publisherToTxmQueue"/>
    <beans:property name="taskExecutor" ref="taskExecutor"/>
    <beans:property name="maxMessagesPerTask" value="10"/>
    <beans:property name="sessionTransacted" value="true"/>
</beans:bean>

<beans:bean id="taskExecutor" class="org.springframework.scheduling.timer.TimerTaskExecutor" />

シングルスレッドを作成するため、ここで TimerTaskExecutor を使用しようとしていますが、そのスレッドはメインスレッドとは別であるため、問題は解決されていません。SyncTaskExecutor を使用してみましたが、どちらも機能しません (または、正しいプロパティ値を提供していない可能性があります)。

回答:
SimpleMessageListenerContainer を使用してこの問題を解決しました。これが新しい構成です

     <int-jms:message-driven-channel-adapter
        id="mq.txbus.publisher.channel.adapter"
        container="messageListenerContainer"
        channel="inbound.endpoint.publisher"
        acknowledge="transacted"
        extract-payload="true" />

<beans:bean id="messageListenerContainer" class="org.springframework.jms.listener.SimpleMessageListenerContainer">     
    <beans:property name="connectionFactory" ref="mockConnectionFactory"/>     
    <beans:property name="destination" ref="publisherToTxmQueue"/>
    <beans:property name="sessionTransacted" value="true"/>
    <beans:property name="exposeListenerSession" value="false"/>
</beans:bean> 
4

2 に答える 2

1

まず、は本質的に非同期であり、ブロックしないことを理解する必要があります。これは、メッセージをキューに送信すると、別のスレッドによって、場合によっては別のマシンで処理されることを意味し、コンシューマーがダウンしている場合は数分または数時間後に処理される可能性があります。

テスト ケースの説明を読むと、システム/統合テストを行っているように見えます。残念ながら、待機以外にできることはあまりありませんが、やみくもに待つべきではありません。テストが遅くなるだけでなく、安定性も低下するためです。ビジー状態のシステムや長い GC プロセスでどれだけ待っても、テストはまだ時間のかかる可能性があります。エラーがないのにアウト。

したがって、一定の秒数スリープする代わりに、たとえば ~100 ミリ秒スリープし、メッセージの処理が完了したときにのみ満たされる条件を確認します。たとえば、メッセージを処理してデータベースにレコードを挿入する場合は、定期的にデータベースをチェックしてください。

よりエレガントな方法 (ビジーな待機なし) は、リクエスト/リプライ パターンを実装することです。JMS でリクエスト レスポンスを実装するにはどうすればよいですか? を参照してください。実装の詳細については。基本的に、メッセージを送信するときは、応答キューを定義し、そのキューでメッセージを待機するブロックを作成します。元のメッセージの処理が完了すると、コンシューマーは応答メッセージを定義されたキューに送信する必要があります。そのメッセージを受け取ったら、すべてのアサーションを実行します。

于 2012-05-07T16:26:26.613 に答える
0

テスト目的の場合は、jms アクティビティが完了したらアサーションを実行する CyclicBarrier を使用しないでください。

于 2012-05-08T19:58:34.747 に答える