20

私は RxJava を学習しており、最初の実験として、このコードrun()の最初のメソッドのコードを書き直して (RxJava が解決できる問題としてNetflix のブログで引用されています)、RxJava を使用して非同期性を改善しようとしています。最初の Future ( ) の結果を待ってから、残りのコードに進みます。f1.get()

f3に依存しf1ます。私はこれを処理する方法を見flatMapて、トリックを行うようです:

Observable<String> f3Observable = Observable.from(executor.submit(new CallToRemoteServiceA()))
    .flatMap(new Func1<String, Observable<String>>() {
        @Override
        public Observable<String> call(String s) {
            return Observable.from(executor.submit(new CallToRemoteServiceC(s)));
        }
    });

次に、f4依存f5f2ます。私はこれを持っています:

final Observable<Integer> f4And5Observable = Observable.from(executor.submit(new CallToRemoteServiceB()))
    .flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer i) {
            Observable<Integer> f4Observable = Observable.from(executor.submit(new CallToRemoteServiceD(i)));
            Observable<Integer> f5Observable = Observable.from(executor.submit(new CallToRemoteServiceE(i)));
            return Observable.merge(f4Observable, f5Observable);
        }
    });

これは奇妙になり始めます(mergeおそらく私が望むものではないでしょう...)が、最後にこれを行うことができますが、私が望むものとはまったく異なります。

f3Observable.subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        System.out.println("Observed from f3: " + s);
        f4And5Observable.subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer i) {
                System.out.println("Observed from f4 and f5: " + i);
            }
        });
    }
});

それは私に与えます:

Observed from f3: responseB_responseA
Observed from f4 and f5: 140
Observed from f4 and f5: 5100

これはすべての数字ですが、残念ながら、結果は別々の呼び出しで得られるため、元のコードの最後の println を完全に置き換えることはできません。

System.out.println(f3.get() + " => " + (f4.get() * f5.get()));

同じ行でこれらの両方の戻り値にアクセスする方法がわかりません。ここで見逃している関数型プログラミング fu がおそらくあると思います。これどうやってするの?ありがとう。

4

3 に答える 3

19

あなたが本当に必要としているのは、RX がどのように使用されているかについてのもう少しの励ましと視点だけのようです。ドキュメントと大理石の図をもっと読むことをお勧めします(それらが常に役立つとは限りません)。lift()また、関数と演算子 を調べることをお勧めします。

  • オブザーバブルの要点は、データフローとデータ操作を単一のオブジェクトに連結することです
  • mapflatMapおよびの呼び出しのポイントはfilter、データ フロー内のデータを操作することです。
  • マージのポイントは、データ フローを結合することです。
  • オペレーターのポイントは、オブザーバブルの安定したストリームを中断し、データ フローで独自の操作を定義できるようにすることです。たとえば、移動平均演算子をコーディングしました。それは double の Observable で s を合計してn double、移動平均のストリームを返します。コードは文字通りこのように見えました

    Observable movingAverage = Observable.from(mDoublesArray).lift(new MovingAverageOperator(frameSize))

当然のことと思っている多くのフィルタリング方法がすべて内部にあることに安心するでしょうlift()

とは言うものの; 複数の依存関係をマージするために必要なのは、次のとおりです。

  • mapまたはを使用して、すべての受信データを標準データ型に変更するflatMap
  • 標準データ型をストリームにマージする
  • あるオブジェクトが別のオブジェクトを待機する必要がある場合、またはストリーム内のデータを順序付ける必要がある場合は、カスタム オペレーターを使用します。注意: このアプローチではストリームが遅くなります
  • リストまたはサブスクライブしてそのすべてのデータを収集するために使用する
于 2014-08-12T22:07:23.440 に答える
7

編集:誰かが、私が質問の編集として追加した次のテキストを回答に変換しました。それを行う正しい方法。私はこのコードを使用したり、コピーするように勧めたりしません。他の/より良い解決策とコメントを歓迎します!


以下の方法で解決できました。flatMapオブザーバブルを複数回使用できるとは思いませんでした。結果は 1 回しか消費できないと思っていました。そのflatMapため、f2Observable を 2 回 (申し訳ありませんが、最初の投稿以降、コード内のいくつかの名前を変更しました)、次にzipすべての Observables でサブスクライブします。タイプのジャグリングのため、値を集約することは望ましくありませんMap他の/より良い解決策とコメントを歓迎します! 完全なコードは gist で表示できます。ありがとうございました。zip

Future<Integer> f2 = executor.submit(new CallToRemoteServiceB());
Observable<Integer> f2Observable = Observable.from(f2);
Observable<Integer> f4Observable = f2Observable
    .flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            System.out.println("Observed from f2: " + integer);
            Future<Integer> f4 = executor.submit(new CallToRemoteServiceD(integer));
            return Observable.from(f4);
        }       
    });     

Observable<Integer> f5Observable = f2Observable
    .flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            System.out.println("Observed from f2: " + integer);
            Future<Integer> f5 = executor.submit(new CallToRemoteServiceE(integer));
            return Observable.from(f5);
        }       
    });     

Observable.zip(f3Observable, f4Observable, f5Observable, new Func3<String, Integer, Integer, Map<String, String>>() {
    @Override
    public Map<String, String> call(String s, Integer integer, Integer integer2) {
        Map<String, String> map = new HashMap<String, String>();
        map.put("f3", s);
        map.put("f4", String.valueOf(integer));
        map.put("f5", String.valueOf(integer2));
        return map;
    }       
}).subscribe(new Action1<Map<String, String>>() {
    @Override
    public void call(Map<String, String> map) {
        System.out.println(map.get("f3") + " => " + (Integer.valueOf(map.get("f4")) * Integer.valueOf(map.get("f5"))));
    }       
});     

そして、これは私に望ましい出力をもたらします:

responseB_responseA => 714000
于 2014-09-20T05:25:58.290 に答える