1

2 つのスレッドがあり、スレッド内の各メソッドが例外をスローします。各スレッドでスローされたすべてのエラーを取得するにはどうすればよいですか? このコードでは、エラー チャネルはエラーの 1 つをキャッチするだけです。基本的に私の目標は、すべてのエラーをキャッチして呼び出し元 (残りのコントローラー) に送信することです。どんな助けでも大歓迎です。ありがとう。

統合.java

public IntegrationFlow provisionUserFlow() {
return IntegrationFlows.from("input.channel")
  .publishSubscribeChannel(Executors.newCachedThreadPool(),
      s -> s
            .subscribe(f -> f.handle(provisionerA, "provision"))
            .subscribe(f -> f.handle(provisionerB, "provision"))
  .get();
}

@ServiceActivator( inputChannel = "errorChannel", outputChannel = "replyChannel")

public boolean processErrors(Exception message) throws RuntimeException{

System.out.println("Message" + message.getMessage());
System.out.println ("******************************");

throw new RuntimeException(message.getMessage());
}

MGateway.java

@MessagingGateway(errorChannel = "errorChannel")
public interface MGateway {

@Gateway(requestChannel = "input.channel", replyChannel = "replyChannel") 
boolean invokeProvisioner(User user);
}

解決

@Bean
public IntegrationFlow provisionUserFlow() {
return
    IntegrationFlows.from("input.channel")
    .publishSubscribeChannel(Executors.newCachedThreadPool(),
        s -> s.applySequence(true)
            .subscribe(f -> f.enrichHeaders(e -> e.header(MessageHeaders.ERROR_CHANNEL, "errorChannel", true))
                .handle(provisionerA, "provision")
                .channel("aggregatorChannel")
            )
            .subscribe(f -> f.enrichHeaders(e -> e.header(MessageHeaders.ERROR_CHANNEL, "errorChannel", true))
                .handle(provisionerB, "provision")
                .channel("aggregatorChannel"))
            )
        .get();
}

@Bean
public IntegrationFlow aggregateFlow() {
return IntegrationFlows.from("aggregatorChannel")
    .channel( aggregatorChannel)
    .aggregate( a -> a.processor( collect, "aggregatingMethod"))
    .get();
}

@Transformer( inputChannel = "errorChannel", outputChannel = "aggregatorChannel")
public Message<?> errorChannelHandler(ErrorMessage errorMessage) throws RuntimeException {

Message<?> failedMessage =  ((MessagingException) errorMessage.getPayload())
    .getFailedMessage();

Exception exception = (Exception) errorMessage.getPayload();

return  MessageBuilder.withPayload( exception.getMessage())
       .copyHeadersIfAbsent( failedMessage.getHeaders() )
       .build();
}
4

1 に答える 1