次のように定義された春の統合フローを持つプロジェクトがあります。
@Bean
public IntegrationFlow validationIntegrationFlow( ValidationService<String> validationService, ReleaseStrategy bundleReleaseStrategy ) {
return IntegrationFlows.from( FtpIntegrationFlowConfiguration.BANK_FTP_CHANNEL )
.split()
.enrichHeaders( headerEnricherSpec -> {
headerEnricherSpec.headerFunction( DESTINATION_CHANNEL_HEADER, message ->
validationService.validate( ( String ) message.getPayload() ) ?
BANK_FTP_VALID_CHANNEL : BANK_FTP_BAD_CHANNEL );
} )
.aggregate( aggregatorSpec -> aggregatorSpec
.correlationStrategy( message -> message.getHeaders().get( DESTINATION_CHANNEL_HEADER ) )
.releaseStrategy( bundleReleaseStrategy )
.expireGroupsUponCompletion( true )
.groupTimeout( 1000 )
.sendPartialResultOnExpiry( true ) )
.route( new HeaderValueRouter( DESTINATION_CHANNEL_HEADER ) )
.get();
}
これが行うことは、各メッセージを検証し、適切な宛先ヘッダーを割り当ててから、結果をグループに集約し、それをルーターに送信して正しいチャネルにディスパッチすることです
フローをチェックして、開始時に送信されるメッセージの数をログに記録するサービスがあり、2 つの宛先チャネルの後に、X サイズのバンドルが処理されたことをサービスに伝える 2 つのハンドルがあります。
以下は、取得したログのサンプルです
2016-12-09 17:04:00.176 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | synchronize | Start sync for correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49] with value [7956]
2016-12-09 17:04:01.397 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1000] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [6956]
2016-12-09 17:04:01.752 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1000] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [5956]
2016-12-09 17:04:02.114 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1000] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [4956]
2016-12-09 17:04:02.410 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1000] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [3956]
2016-12-09 17:04:02.681 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1000] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [2956]
2016-12-09 17:04:02.991 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1000] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [1956]
2016-12-09 17:04:03.344 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1000] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [956]
2016-12-09 17:04:05.538 | [taskScheduler-5] | INFO | org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler | expireGroup | Expiring MessageGroup with correlationKey[bad.channel]
2016-12-09 17:04:05.538 | [taskScheduler-5] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [955]
2016-12-09 17:04:05.590 | [taskScheduler-9] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [955] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [0]
2016-12-09 17:04:06.635 | [taskScheduler-2] | WARN | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | counter for [b5e69b73-aa99-4e9e-a7d2-59c015793a49] not found, ignoring subtract of [955]
すべてのメッセージが処理された後にわかるように、別のバンドルが作成されたことを示す ExpireGroup ログの後に警告を記録します。
アグリゲーターの前にログを置いてみましたが、そこに重複したメッセージは表示されません。重複しないようにアグリゲーターを構成するのを手伝ってくれる人はいますか?
ps:ここで役立つ場合は、バンドルリリース戦略をインスタンス化する方法です
new TimeoutCountSequenceSizeReleaseStrategy( 100, 1000L );