1

次のワークフローがあります。

  1. インバウンドチャネル

  2. スプリッター

  3. 分割チャネルのタスク エグゼキュータ - すべてのスレッドが同じワークフローを実行します。

3.a. リクエストを構築する

3.b. ゲートウェイ メッセージ エンドポイントのサービス アクティベーター ラッパー。

3.c. error-channel の構成を使用した http-outbound-gateway のゲートウェイ ラッパー (http-outbound-gateway の呼び出し中に例外を処理するため)

3.d. http-アウトバウンド-ゲートウェイ

  1. アグリゲータ

  2. 春の統合ワークフローからの応答。

3.d で例外が発生した場合、ゲートウェイ エラー チャネル用に構成されたサービス アクティベーターに制御が移ります。失敗したメッセージから新しいヘッダー、エラー チャネルに渡されたヘッダーに次のコードだけをコピーします。

a. b.シーケンス番号 c.シーケンスサイズ

ただし、スプリッターの応答を集約している間、DefaultAggregatingMessageGroupProcessor.java は競合するヘッダーを削除し、それにより、アグリゲーターに制御を提供する前にエラー チャネルと応答チャネルを削除します。

したがって、アグリゲーターが操作を完了すると、応答またはエラー チャネルを見つけることができず、例外が発生します。

spring-integration-core バージョン 2.2.1 を使用していますが、ヘッダー アグリゲーション中に応答チャネルとエラー チャネルが削除される理由がわかりません。

この問題を解決するための情報は非常に役立ちます。

ありがとうございました :)

編集 1: このシナリオで私を助けてくれてありがとう、ゲイリー。現在の構成を共有しています

<!-- SPLITTER -->
<int:splitter id="dentalSplitter" ref="dentalServiceSplitter"
    method="getDentalServiceList" input-channel="dentalServiceSplitterChannel"
    output-channel="dentalSplitterTaskChannel" />

<int:channel id="dentalSplitterTaskChannel">
    <int:dispatcher task-executor="dentalTaskExecutor" />
</int:channel>

<int:chain input-channel="dentalSplitterTaskChannel" output-channel="dentalGatewayChannel">
    <int:header-enricher>
        <int:header name="CHAIN_START_TIME" expression="T(System).currentTimeMillis()" overwrite="true" />
    <int:object-to-json-transformer content-type="application/json"/>               
</int:chain>

<int:service-activator input-channel="dentalGatewayChannel" ref="dentalGatewayWrapper" output-channel="dentalReplyChannel" />
<int:gateway id="dentalGatewayWrapper" default-request-channel="dentalCostEstimateRequestChannel" error-channel="dentalErrorChannel"/>

<int-http:outbound-gateway id="dentalGateway"
    url-expression="@urlBuilder.build('${service.endpoint}')"  
    http-method="POST" request-factory="clientHttpRequestFactory"  
    request-channel="dentalCostEstimateRequestChannel" extract-request-payload="true" 
    expected-response-type="com.dental.test.DentalResponse">
    <int-http:request-handler-advice-chain> 
         <ref bean="logChainTimeInterceptor" /> 
    </int-http:request-handler-advice-chain>
</int-http:outbound-gateway>

<!-- EXCEPTION -->
<int:chain input-channel="dentalErrorChannel" output-channel="dentalAggregatorChannel">
    <int:transformer ref="commonErrorTransformer" method="dentalGracefulReturn"/>
</int:chain>

<!-- SUCCESS -->
<int:chain input-channel="dentalReplyChannel" output-channel="dentalAggregatorChannel">
        <int:filter discard-channel="dentalErrorChannel"
        expression="T(com.dental.util.InvocationOutcomeHelper).isOutcomeSuccess(payload?.metadata?.outcome?.code,payload?.metadata?.outcome?.message)" />
</int:chain>

<!-- AGGREGATION -->

<int:chain input-channel="dentalAggregatorChannel" output-channel="wsDentalServiceOutputChannel" >
    <int:aggregator ref="dentalServiceAggregator" />
    <int:service-activator ref="dentalResponseServiceActivator" />
</int:chain>

私が気付いたのは、ゲートウェイを通過するときにすべての分割チャネルがエラーと応答用の新しい一時チャネルを作成し、ゲートウェイから応答を取得した後、保存された (元の受信) エラーと応答チャネルヘッダーを保持することです。そして、あなたが言及したように、制御がエラートランスフォーマーに到達した後、保存されたヘッダーを保持するフローが壊れ、集約メッセージグループプロセッサが一時チャネルの 3 つの異なるインスタンスを受信するため、それらを削除します。カスタム メッセージ グループ プロセッサを用意し、ヘッダーを集約するための競合解決戦略を変更することを計画していたので、この構成を思いつきました。

<bean id="channelPreservingAggregatingMessageHandler" class="org.springframework.integration.aggregator.AggregatingMessageHandler">
    <constructor-arg name="processor" ref="channelPreservingMessageGroupProcessor"/>
</bean>

私はまだこれをテストしていません。しかし、この議論に基づくと、これは実行可能な解決策のようには見えません。

また、ゲートウェイでのエラー処理の構成が正しくないようです。ただし、「メッセージを直接転送する代わりに、エラーフローでエラーを処理し、結果を通常どおりゲートウェイの「ラッパー」に返す」というあなたの声明には混乱しています。エラー チャネルを削除すると、例外が発生したときにどのように制御を取り戻すことができますか? ここで何かを理解していない可能性があります。これについて詳しく教えてください。

4

1 に答える 1