パブリッシュ/サブスクライブ チャネルを使用して RESTful リクエストが 2 つのプロバイダーに転送される、非常に単純な統合フローがあります。両方の RESTful サービスからの結果は、1 つの配列に集約されます。統合フローのスケッチは次のとおりです。
@Bean
IntegrationFlow flow() throws Exception {
return IntegrationFlows.from("inputChannel")
.publishSubscribeChannel(s -> s.applySequence(true)
.subscribe(f -> f
.handle(Http.outboundGateway("http://provider1.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class))
).subscribe(f -> f
.handle(Http.outboundGateway("http://provider2.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class)
)
)
)
.aggregate()
.get();
}
ただし、私のコードを実行すると、結果の配列には RESTful サービスの 1 つだけから返された項目が含まれます。不足している構成手順はありますか?
アップデート
次のバージョンは、Artem のコメントを考慮して、完全なソリューションに対応しています。
@Bean
IntegrationFlow flow() throws Exception {
return IntegrationFlows.from("inputChannel-scatter")
.publishSubscribeChannel(s -> s.applySequence(true)
.subscribe(f -> f
.handle(Http.outboundGateway("http://provider1.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class))
.channel("inputChannel-gather"))
.subscribe(f -> f
.handle(Http.outboundGateway("http://provider2.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class))
.channel("inputChannel-gather")))
.get();
}
@Bean
IntegrationFlow gatherFlow() {
return IntegrationFlows.from("inputChannel-gather")
.aggregate(a -> a.outputProcessor(g -> new GenericMessage<ItemDTO[]>(
g.getMessages().stream()
.flatMap(m -> Arrays.stream((ItemDTO[]) m.getPayload()))
.collect(Collectors.toList()).toArray(new ItemDTO[0]))))
.get();
}