0

project-reactor を使用して、外部 Web サービスからいくつかのデータを取得し、結果のオブジェクトを生成します。

最初に、次の Web サービス呼び出しをトリガーするために必要なマスターデータを取得する必要があります。マスターデータが利用可能になったら、マスターデータの結果に基づいてさらにデータを取得します。次に、すべてのモノがその結果を発行するのを待つ必要があります。次に、すべてのデータを処理し、結果のオブジェクトを構築します。

リアクティブストリームの経験はあまりありません。ネストされたサブスクリプションを使用したソリューションは機能しますが、やりたいことをアーカイブするためのより良い方法があるかもしれないと信じています.

質問1

Masterdata_A と Masterdata_B は並行して取得できますが、ネスティングせずにリアクティブな方法でこれを表現するにはどうすればよいでしょうか? getFluxMasterdata_B の各結果は、getMonoMasterdata_A の 1 つの結果と組み合わせる必要があります。

質問2

両方の部品を持つ Tupel は、多くのデータ要求で Web サービスを圧倒しないように、何らかの方法で制限する必要があります。実際の 1 秒の遅延は推測にすぎませんが、最初の内部 flatMap の並列実行の最大数を定義して、一度に最大 N 個の Web サービス呼び出しを待機させることをお勧めします。

質問 3

将来的には、ProcessingResult を構築するために Web サービスからフェッチしなければならないデータがさらに存在する可能性があります。リアクティブストリームを定義して読みやすく/理解しやすくするベストプラクティスはありますか? リアクティブ ストリームのネストは問題ありませんか、それとも避けるべきですか (すべてをトップ レベルに維持します)。


ドメインモデル

    private static class Masterdata_A
    {
        private List<MasterdataRecord_A> records;
    }

    private static class MasterdataRecord_A { /* ... business relevant fields */ }
    private static class MasterdataRecord_B { /* ... business relevant fields */ }
    private static class Data_A { /* ... business relevant fields */ }
    private static class Data_B { /* ... business relevant fields */ }
    private static class Data_C { /* ... business relevant fields */ }

    private static class ProcessingResult { /* ... business relevant fields */ }

WebserviceImpl

    private static class Webservice
    {
        private Mono<Masterdata_A> getMonoMasterdata_A() { /* fetch data from external webservice */ }
        private Flux<MasterdataRecord_B> getFluxMasterdata_B() { /* fetch data from external webservice */ }

        private Mono<Data_A> getMonoData_A() { /* fetch data from external webservice */ }
        private Mono<Data_B> getMonoData_B() { /* fetch data from external webservice */ }
        private Mono<Data_C> getMonoData_C() { /* fetch data from external webservice */ }
    }

BusinessServiceImpl

    public class BusinessService
    {
        public void processData(...params...)
        {
            Webservice webservie = getWebservice();
            // As soon as Mono<Masterdata_A> emits its result AND Flux<Masterdata_B> emits its first result than the first inner flatMap should be executed
            // to fetch some extra data from the service based on the actual masterdata.
            // For building the ProcessingResult we need access to all data available in the actual context.
            webservice.getMonoMasterdata_A()
                    .subscribe((Masterdata_A masterdataA) -> {
                        webservice.getFluxMasterdata_B()
                                .delayElements(Duration.ofSeconds(1))
                                .flatMap((MasterdataRecord_B masterdataB) -> {
                                    Mono<Data_A> monoA = webservice.getMonoData_A(masterdataA);
                                    Mono<Data_B> monoB = webservice.getMonoData_B(masterdataB);
                                    Mono<Data_C> monoC = webservice.getMonoData_C(masterdataA, masterdataB);
                                    // wait for result of all Monos
                                    return Mono.zip(monoA, monoB, monoC);
                                })
                                .flatMap((Tuple3<Data_A, Data_B, Data_C> data) -> {
                                    Data_A dataA = data.getT1();
                                    Data_B dataB = data.getT2();
                                    Data_C dataC = data.getT3();

                                    // create result from masterdataA, masterdataB, dataA, dataB, dataC
                                    ProcessingResult result = ...;
                                    return Mono.just(result);
                                })
                                .subscribe(processingResult -> {
                                    // store result to db/filesystem
                                });
                    });
        }
    }
4

1 に答える 1