3

次のように定義された春の統合フローを持つプロジェクトがあります。

@Bean
public IntegrationFlow validationIntegrationFlow( ValidationService<String> validationService, ReleaseStrategy bundleReleaseStrategy ) {
    return IntegrationFlows.from( FtpIntegrationFlowConfiguration.BANK_FTP_CHANNEL )                
            .split()
            .enrichHeaders( headerEnricherSpec -> {
                headerEnricherSpec.headerFunction( DESTINATION_CHANNEL_HEADER, message ->
                        validationService.validate( ( String ) message.getPayload() ) ?
                                BANK_FTP_VALID_CHANNEL : BANK_FTP_BAD_CHANNEL );
            } )
            .aggregate( aggregatorSpec -> aggregatorSpec
                    .correlationStrategy( message -> message.getHeaders().get( DESTINATION_CHANNEL_HEADER ) )
                    .releaseStrategy( bundleReleaseStrategy )
                    .expireGroupsUponCompletion( true )
                    .groupTimeout( 1000 )
                    .sendPartialResultOnExpiry( true ) )
            .route( new HeaderValueRouter( DESTINATION_CHANNEL_HEADER ) )
            .get();
}

これが行うことは、各メッセージを検証し、適切な宛先ヘッダーを割り当ててから、結果をグループに集約し、それをルーターに送信して正しいチャネルにディスパッチすることです

フローをチェックして、開始時に送信されるメッセージの数をログに記録するサービスがあり、2 つの宛先チャネルの後に、X サイズのバンドルが処理されたことをサービスに伝える 2 つのハンドルがあります。

以下は、取得したログのサンプルです

2016-12-09 17:04:00.176 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | synchronize | Start sync for correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49] with value [7956]
2016-12-09 17:04:01.397 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1000] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [6956]
2016-12-09 17:04:01.752 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1000] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [5956]
2016-12-09 17:04:02.114 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1000] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [4956]
2016-12-09 17:04:02.410 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1000] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [3956]
2016-12-09 17:04:02.681 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1000] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [2956]
2016-12-09 17:04:02.991 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1000] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [1956]
2016-12-09 17:04:03.344 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1000] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [956]
2016-12-09 17:04:05.538 | [taskScheduler-5] | INFO | org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler | expireGroup | Expiring MessageGroup with correlationKey[bad.channel]
2016-12-09 17:04:05.538 | [taskScheduler-5] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [955]
2016-12-09 17:04:05.590 | [taskScheduler-9] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [955] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [0]
2016-12-09 17:04:06.635 | [taskScheduler-2] | WARN | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | counter for [b5e69b73-aa99-4e9e-a7d2-59c015793a49] not found, ignoring subtract of [955]

すべてのメッセージが処理された後にわかるように、別のバンドルが作成されたことを示す ExpireGroup ログの後に警告を記録します。

アグリゲーターの前にログを置いてみましたが、そこに重複したメッセージは表示されません。重複しないようにアグリゲーターを構成するのを手伝ってくれる人はいますか?

ps:ここで役立つ場合は、バンドルリリース戦略をインスタンス化する方法です

new TimeoutCountSequenceSizeReleaseStrategy( 100, 1000L );
4

0 に答える 0