私は次の設定を持っています
@Bean
public IntegrationFlow mainFlow(){
return IntegrationFlows.from("channel1")
.route("condition", p -> p
.subFlowMapping("true", flow -> flow.gateway("channel2"))
.subFlowMapping("false" flow -> {
flow.gateway("channel3", c -> c.replyChannel("aggregatorOutputChannel"));
flow.gateway("channel4");
})
.get();
}
@Bean
public MessageChannel channel3()
{
return MessageChannels.publishSubscribe().applySequence(true).get();
}
@Bean
public IntegrationFlow flow1(){
return IntegrationFlows.from("channel3")
.transform(s -> {
System.out.println(s);
return s;
}
.channel("aggregatorInputChannel")
.get();
}
@Bean
public IntegrationFlow flow2(){
return IntegrationFlows.from("channel3")
.split()
.transform(s -> {
System.out.println(s);
return s;
}
.aggregate()
.channel("aggregatorInputChannel")
.get();
}
@Bean
public IntegrationFlow aggregatorFlow(){
return IntegrationFlows.from("aggregatorInputChannel")
.aggregate((AggregatorSpec s) -> s.groupTimeout(60000)
.outputProcessor(new AggregationMessageGroupProcessor())
.sendPartialResultOnExpiry(true)
.expireGroupsUponCompletion(true)
.releaseStrategy(new TimeoutOrSequenceCountComparatorReleaseStrategy(4, 60000)), null)
.gateway("postAggregation")
.channel("aggregatorOutputChannel")
.get();
}
チャネル 3 は pub-sub チャネルであり、メッセージを処理し、完了後に aggregatorInputChannel にメッセージを入れる 2 つのコンシューマーがあります。ただし、私が観察しているのは、メッセージが最終的に aggregatorOutputChannel に入れられたとしても (アグリゲーター Bean 定義を介してデバッグし、メッセージが aggregatorOutputChannel に入るのを見ることができます)、channel3 のゲートウェイがブロックされ、返されないことです。その結果、メッセージはチャネル 4 には送信されません。
ここで何か不足していますか?それとも何か間違ったことをしていますか?
AggregationMessageGroupProcessor は、aggregationPayloads メソッドをオーバーライドするだけのカスタム アグリゲーターです。
public class AggregationMessageGroupProcessor extends AbstractAggregatingMessageGroupProcessor {
@Override
protected final Object aggregatePayloads(MessageGroup group, Map<String, Object> headers) {
group.getOne().getPayload();
}
}
ありがとう!