5

パブリッシュ/サブスクライブ チャネルを使用して RESTful リクエストが 2 つのプロバイダーに転送される、非常に単純な統合フローがあります。両方の RESTful サービスからの結果は、1 つの配列に集約されます。統合フローのスケッチは次のとおりです。

@Bean
IntegrationFlow flow() throws Exception {
    return IntegrationFlows.from("inputChannel")
            .publishSubscribeChannel(s -> s.applySequence(true)
                .subscribe(f -> f
                        .handle(Http.outboundGateway("http://provider1.com/...")
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(ItemDTO[].class))
                ).subscribe(f -> f
                        .handle(Http.outboundGateway("http://provider2.com/...")
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(ItemDTO[].class)
                        )
                )
            )
            .aggregate()
            .get();
}

ただし、私のコードを実行すると、結果の配列には RESTful サービスの 1 つだけから返された項目が含まれます。不足している構成手順はありますか?

アップデート

次のバージョンは、Artem のコメントを考慮して、完全なソリューションに対応しています。

@Bean
IntegrationFlow flow() throws Exception {
    return IntegrationFlows.from("inputChannel-scatter")
            .publishSubscribeChannel(s -> s.applySequence(true)
                .subscribe(f -> f
                        .handle(Http.outboundGateway("http://provider1.com/...")
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(ItemDTO[].class))
                        .channel("inputChannel-gather"))
                .subscribe(f -> f
                        .handle(Http.outboundGateway("http://provider2.com/...")
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(ItemDTO[].class))
                        .channel("inputChannel-gather")))
            .get();
}

@Bean
IntegrationFlow gatherFlow() {
    return IntegrationFlows.from("inputChannel-gather")
            .aggregate(a -> a.outputProcessor(g ->  new GenericMessage<ItemDTO[]>(
                        g.getMessages().stream()
                                .flatMap(m -> Arrays.stream((ItemDTO[]) m.getPayload()))
                                .collect(Collectors.toList()).toArray(new ItemDTO[0]))))
            .get();
}
4

1 に答える 1

4

実際には、そのようには機能しません。

.aggregate()は、その の3番目の加入者ですpublishSubscribeChannel

流れを 2 つに切断する必要があります。このような:

    @Bean
    public IntegrationFlow publishSubscribeFlow() {
        return flow -> flow
                .publishSubscribeChannel(s -> s
                        .applySequence(true)
                        .subscribe(f -> f
                                .handle((p, h) -> "Hello")
                                .channel("publishSubscribeAggregateFlow.input"))
                        .subscribe(f -> f
                                .handle((p, h) -> "World!")
                                .channel("publishSubscribeAggregateFlow.input"))
                );
    }

    @Bean
    public IntegrationFlow publishSubscribeAggregateFlow() {
        return flow -> flow
                .aggregate(a -> a.outputProcessor(g -> g.getMessages()
                        .stream()
                        .<String>map(m -> (String) m.getPayload())
                        .collect(Collectors.joining(" "))))
                .channel(c -> c.queue("subscriberAggregateResult"));
    }

.channel("publishSubscribeAggregateFlow.input")両方の加入者からの使用に注意してください。

正直に言うと、それは任意のポイントですpublish-subscribe。サブスクライバーを集計する場合、すべてのサブスクライバーの結果をどこに送信するかを知っておく必要があります。

あなたのユースケースは、Scatter-Gather EIP パターンを思い起こさせます。

DSL にはまだ実装されていません。この問題についてGHの問題を提起してください。今後の1.2バージョンで対応できるように努めます。

アップデート

問題に関するGHの問題: https://github.com/spring-projects/spring-integration-java-dsl/issues/75

于 2016-04-20T15:48:45.417 に答える