4

Observableにたくさんの変換があるとします:

operation()
    .flatMap(toSomething())
    .map(toSomethingElse())
    .flatMap(toYetSomethingElse())
    .subscribeOn(Schedulers.newThread())
    .observeOn(AdroidSchedulers.mainThread())
    .subscribe(observer);

への最後の呼び出しを除いて、これらの操作はすべて同期していflatMap()ますか? それとも、サブスクライブするように指示したスレッドですべての操作が実行されますか?

4

1 に答える 1

2

私はこれをテストで理解しました。次のテスト パス (これは、Observable の放出がすべて同じバックグラウンド スレッド上にあることを意味します):

    volatile long observableThreadId;

    @Test
    public void transformedObservables_shouldRunInSameThread() {

        Observable.from(new String[]{"a", "b", "c"}) //
            .flatMap(new Func1<String, Observable<Object>>() {
                @Override public Observable<Object> call(String s) {
                    observableThreadId = Thread.currentThread().getId();
                    return Observable.from((Object) s);
                }
            }) //
            .map(new Func1<Object, String>() {
                @Override public String call(Object o) {
                    long id = Thread.currentThread().getId();
                    if (id != observableThreadId) {
                        throw new RuntimeException("Thread ID mismatch");
                    }

                    return (String) o;
                }
            }) //
            .flatMap(new Func1<String, Observable<String>>() {
                @Override public Observable<String> call(String s) {
                    long id = Thread.currentThread().getId();
                    if (id != observableThreadId) {
                        throw new RuntimeException("Thread ID mismatch");
                    }

                    return Observable.from(s);
                }
            }) //
            .subscribeOn(Schedulers.newThread()) //
            .observeOn(Schedulers.currentThread()) //
            .subscribe(new Observer<String>() {
                @Override public void onCompleted() {
                    assertThat(Thread.currentThread().getId()).isNotEqualTo(observableThreadId);
                }

                @Override public void onError(Throwable throwable) {

                }

                @Override public void onNext(String s) {

                }
            });

        System.out.println("blah");
    }

=============================== 更新:

より良い答えは、実際にはSchedulerの ReactiveX ドキュメントにあります。

デフォルトでは、Observable とそれに適用する一連のオペレーターは、そのSubscribeメソッドが呼び出されたスレッドと同じスレッドで機能し、そのオブザーバーに通知します。SubscribeOnオペレーターは、Observable が動作する別の Scheduler を指定することで、この動作を変更します。ObserveOnオペレーターは、Observable がオブザーバーに通知を送信するために使用する別のスケジューラーを指定します。

... SubscribeOnオペレーターは、そのオペレーターが呼び出されるオペレーターのチェーンのどの時点に関係なく、Observable が操作を開始するスレッドを指定します。一方、 ObserveOnは、Observable がその演算子が表示される場所の下で使用するスレッドに影響を与えます。このため、Observable オペレーターのチェーン中のさまざまな時点で ObserveOn を複数回呼び出して、それらのオペレーターの特定が動作するスレッドを変更することができます。

于 2014-03-11T21:05:27.540 に答える