2

短編小説: 1 つの目的を持つ 2 つの Observable がある状況があります。

  • 彼らはいくつかのデータを受け取ります
  • 変更されたデータを返します
  • データを処理できない場合はエラーをスローします

それぞれが異なる種類のデータの処理を担当しています。さらに、両方のデータが処理されたときに何かをしたいと考えています。

私の現在の最良の実装は次のとおりです。これらは私の Observables です。

    Single<BlueData> blueObservable = Single.create(singleSubscriber -> {
        if (BlueDataProcessor.isDataValid(myBlueData)) {
            singleSubscriber.onSuccess(BlueDataProcessor.process(myBlueData));
        }
        else {
            singleSubscriber.onError(new BlueDataIsInvalidThrow());
        }
    });

    Single<RedData> redObservable = Single.create(singleSubscriber -> {
        if (RedDataProcessor.isDataValid(myRedData)) {
            singleSubscriber.onSuccess(RedDataProcessor.process(myRedData));
        }
        else {
            singleSubscriber.onError(new RedDataIsInvalidThrowable());
        }
    });

    Single<PurpleData> composedSingle = Single.zip(blueObservable, redObservable,
            (blueData, redData) -> PurpleGenerator.combine(blueData, redData));

また、次のサブスクリプションがあります。

    blueObservable.subscribe(
            result -> {
                saveBlueProcessStats(result);
            },
            throwable -> {
                logError(throwable);
            });

    redObservable.subscribe(
            result -> {
                saveRedProcessStats(result);
            },
            throwable -> {
                logError(throwable);
            });


    composedSingle.subscribe(
            combinedResult -> {
                savePurpleProcessStats(combinedResult)
            },
            throwable -> {
                logError(throwable);
            });

私の問題: 青と赤のデータは 2 回処理されます。これは、Observable.zip() で作成された結合されたオブザーバブルにサブスクライブすると、両方のサブスクリプションが再度実行されるためです。

両方の操作を 2 回実行せずにこの動作を行うにはどうすればよいですか?

4

1 に答える 1

2

Singleこれは1.x では不可能です。これは、 aConnectableSingleおよび したがっての概念がないためSingle.publishです。2.x と RxJava2Extensions ライブラリを介して効果を得ることができます。

SingleSubject<RedType> red = SingleSubject.create();
SingleSubject<BlueType> blue = SingleSubject.create();

// subscribe interested parties
red.subscribe(...);
blue.subscribe(...);

Single.zip(red, blue, (r, b) -> ...).subscribe(...);

// connect()
blueObservable.subscribe(blue);
redObservable.subscribe(red);
于 2016-11-16T17:40:57.720 に答える