3

このスレッドによると、 conCatMap と flatmap はアイテムが発行される順序のみが異なります。そこで、テストを行い、整数の単純なストリームを作成し、それらがどの順序で放出されるかを確認したいと考えました。1 ~ 5 の範囲の数値を取り、それらを 2 倍する小さなオブザーバブルを作成しました。簡単。

フラットマップを使用したコードは次のとおりです。

myObservable.flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            return Observable.just(integer * 2);
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(Integer integer) {
        Log.v("myapp","from flatMap:"+integer);
        }
    });

concatMap を使用したまったく同じコード:

myObservable.concatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            return Observable.just(integer * 2);
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(Integer integer) {
        Log.v("myapp","from concatmap:"+integer);
        }
    });

ログで印刷を見たとき、順序が両方で同じだったのはなぜですか? concatMap だけが順序を保持すると思いましたか?

4

1 に答える 1

10

あなたが見ているのは偶然です。あなたが値を返すたびflatMapに、前のものと同じスレッドでそうします。

マルチスレッドを利用するように例を変更しました。

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .flatMap(integer -> Observable.just(integer)
                .observeOn(Schedulers.computation())
                .flatMap(i -> {
                    try {
                        Thread.sleep(new Random().nextInt(1000));
                        return Observable.just(2 * i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        return Observable.error(e);
                    }
                }))
        .subscribe(System.out::println,
                Throwable::printStackTrace,
                () -> System.out.println("onCompleted"));

2 * i異なる順序を強制するために、ランダムな遅延によって各値を遅らせています。observeOn(Schedulers.computation())また、次の演算子 ( flatMap) が計算スレッド プールで実行されるように、この前に追加しました。これにより、マルチスレッド マジックが実行されます。

これは、私の例 (Android の場合) で得られる出力です。

I/System.out: 6
I/System.out: 4
I/System.out: 12
I/System.out: 14
I/System.out: 8
I/System.out: 2
I/System.out: 16
I/System.out: 20
I/System.out: 10
I/System.out: 18
I/System.out: onCompleted

flatMapの後justをに置き換えるとconcatMap、適切な順序で出力されます。

適切な説明が記載された Thomas Nield による素晴らしい投稿があります。

于 2016-03-21T14:11:40.963 に答える