2

複数のアクションを通じてメッセージを同時に処理したいSpringIntegrationプロジェクトがあります。だから私はを設定しましpublish-subscribe-channeltask-executor。ただし、すべての処理が完了するのを待ってから次に進みます。どうすればいいですか?

<publish-subscribe-channel id="myPubSub" task-executor="my10ThreadPool"/>

<channel id="myOutputChannel"/>

<service-activator input-channel="myPubSub" output-channel="myOutputChannel"
         ref="beanA" method="blah"/>
<service-activator input-channel="myPubSub" output-channel="myOutputChannel"
         ref="beanB" method="blah"/>

<service-activator id="afterThreadingProcessor" input-channel="myOutputChannel" .../>

したがって、上記の場合、両方の作業が完了しafterThreadingProcessorた後で1回だけ呼び出されるようにします。ただし、上記では2回呼び出されます。beanAbeanBafterThreadingProcessor

4

2 に答える 2

4
  1. pub-subチャネルに追加します(これにより、、、、などのapply-sequence="true"デフォルトの相関データがメッセージに追加され、ダウンストリームコンポーネントでデフォルトの戦略を使用できるようになります)。correlationIdsequenceSizesequenceNumber

  2. <aggregator/>beforeを追加afterThreadingProcessorし、2つのからの出力をそれにルーティングします<service-activator/>

  3. アグリゲーターの後にを追加し<splitter/>ます-デフォルトのスプリッターは、アグリゲーターによって作成されたコレクションを2つのメッセージに分割します。

afterThreadingProcessor作業を完了する2番目のスレッドのメッセージごとに1回呼び出されます。

チェーンを使用すると、構成が簡単になります...

<chain input-channel="myOutputChannel">
    <aggregator />
    <splitter />
    <service-activator id="afterThreadingProcessor" input-channel="myOutputChannel" .../>
</chain>

Collection<?>最終サービスを1回呼び出すには、スプリッターを追加する代わりに、サービスを変更して取得します。

編集:

コメント#3(元のスレッドで最終サービスを実行する)でやりたいことを行うには、これが機能するはずです...

<int:channel id="foo" />
<int:service-activator ref="twoServicesGateway" input-channel="foo"
      output-channel="myOutputChannel" />

<int:gateway id="twoServicesGateway" default-request-channel="myPubSub"/>
    <int:publish-subscribe-channel id="myPubSub" task-executor="my10ThreadPool"
            apply-sequence="true"/>
    <int:service-activator input-channel="myPubSub" output-channel="aggregatorChannel"
         ref="beanA" method="blah"/>
    <int:service-activator input-channel="myPubSub" output-channel="aggregatorChannel"
         ref="beanB" method="blah"/>
    <int:aggregator input-channel="aggregatorChannel" />

<int:service-activator id="afterThreadingProcessor" input-channel="myOutputChannel" .../>

この場合、ゲートウェイは他の2つのサービスとアグリゲーターをカプセル化します。デフォルトservice-interfaceは単純RequestReplyExchangerです。呼び出し元のスレッドは出力を待ちます。アグリゲーターにはフレームワークがないためoutput-channel、フレームワークはゲートウェイに応答を送信し、待機中のスレッドはそれを受信します。に戻る<service-activator/>と、結果が最終サービスに送信されます。

reply-timeoutデフォルトでは、ゲートウェイは無期限に待機し、サービスの1つがnullを返した場合、集約された応答は受信されないため、ゲートウェイにを配置することをお勧めします。

ゲートウェイフローがゲートウェイから実行されることを示すために、ゲートウェイフローをインデントしたことに注意してください。これらは、ゲートウェイの子要素ではありません。

于 2013-02-27T15:03:50.947 に答える
0

EIP Scatter-Gatherパターンの実装として、Spring Integration 4.1.0で導入されたよりクリーンなアプローチを使用して、同じ種類の動作を実現できるようになりました。

チェックアウトスキャッター-サンプルの要点を収集します: https ://gist.github.com/noorulhaq/b13a19b9054941985109

于 2015-03-07T13:50:42.157 に答える