1

私は次の設定を持っています

@Bean
public IntegrationFlow mainFlow(){
 return IntegrationFlows.from("channel1")
        .route("condition", p -> p
                .subFlowMapping("true", flow -> flow.gateway("channel2"))
                .subFlowMapping("false" flow -> {
                    flow.gateway("channel3", c -> c.replyChannel("aggregatorOutputChannel"));
                    flow.gateway("channel4");
                 })
        .get();
}

@Bean
public MessageChannel channel3()
{
    return MessageChannels.publishSubscribe().applySequence(true).get();
}

@Bean
public IntegrationFlow flow1(){
  return IntegrationFlows.from("channel3")
     .transform(s -> {
        System.out.println(s);
        return s;
      }
     .channel("aggregatorInputChannel")
     .get();
}

@Bean
public IntegrationFlow flow2(){
  return IntegrationFlows.from("channel3")
     .split()
     .transform(s -> {
        System.out.println(s);
        return s;
      }
     .aggregate()
     .channel("aggregatorInputChannel")
     .get();
}

@Bean
public IntegrationFlow aggregatorFlow(){
        return IntegrationFlows.from("aggregatorInputChannel")
                .aggregate((AggregatorSpec s) -> s.groupTimeout(60000)
                        .outputProcessor(new AggregationMessageGroupProcessor())
                        .sendPartialResultOnExpiry(true)
                        .expireGroupsUponCompletion(true)
                        .releaseStrategy(new TimeoutOrSequenceCountComparatorReleaseStrategy(4, 60000)), null)
                .gateway("postAggregation")
                .channel("aggregatorOutputChannel")
                .get();
}

チャネル 3 は pub-sub チャネルであり、メッセージを処理し、完了後に aggregatorInputChannel にメッセージを入れる 2 つのコンシューマーがあります。ただし、私が観察しているのは、メッセージが最終的に aggregatorOutputChannel に入れられたとしても (アグリゲーター Bean 定義を介してデバッグし、メッセージが aggregatorOutputChannel に入るのを見ることができます)、channel3 のゲートウェイがブロックされ、返されないことです。その結果、メッセージはチャネル 4 には送信されません。

ここで何か不足していますか?それとも何か間違ったことをしていますか?

AggregationMessageGroupProcessor は、aggregationPayloads メソッドをオーバーライドするだけのカスタム アグリゲーターです。

public class AggregationMessageGroupProcessor extends AbstractAggregatingMessageGroupProcessor {

  @Override
  protected final Object aggregatePayloads(MessageGroup group, Map<String, Object> headers) {
      group.getOne().getPayload();
  }
}

ありがとう!

4

0 に答える 0