3

RxJs には、変換関数を使用してシーケンスを再帰的に展開できる Observable.expand というメソッドがあります。

例えば、

Rx.Observable.return(0).expand(function (x) { return Rx.Observable.return(x+1); }) 

すべての整数を出力します

ただし、RxJava でこのメソッドを見つけることができません。同様の目標を達成できるRxJavaの他のメソッドはありますか?

exapand() の詳細な仕様については、https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/expand.mdを参照してください。

4

2 に答える 2

4

さて、あなたが示した例は次のように簡略化できます。

Observable.range(0, Integer.MAX_VALUE)

しかし、実際にはもっと複雑なことをしたいと思っていると思います。scanはあなたが探しているものと同一ではありませんが、同様のことができますTransformer

との主な違いscanは、すべてのステップで新しい入力値が必要ですが、展開と同様に、前の値を保持することです。新しい入力値を無視するだけで、それを乗り越えることができます。スキャンはエキスパンドに似ているため、scanかなり大きな欠陥がある例から始めて、より良いオプションを検討します。

public class Expand<T, T> implements Transformer<T, T> {
  private final Func1<T, T> expandFunc;

  public Expand(final Func1<T, T> expandFunc) {
    this.initialValue = initialValue;
    this.expandFunc = expandFunc;
  }

  @Override
  public Observable<T> call(Observable<T> source) {
    // Here we treat emissions from the source as a new 'initial value'.

    // NOTE: This will effectively only allow one input from the source, since the
    // output Observable expands infinitely. If you want it to switch to a new expanded
    // observable each time the source emits, use switchMap instead of concatMap.
    return source.concatMap(new Func1<T, Observable<T>>() {

      @Override
      public Observable<T> call(T initialValue) {
        // Make an infinite null Observable, just for our next-step signal.
        return Observable.<Void>just(null).repeat()
            .scan(initialValue, new Func2<T, Void, T>() {

              @Override
              public T call(final T currentValue, final Void unusedSignal) {
                return expandFunc.call(currentValue);
              }

            });
      }

    });
  }
}

そのトランスフォーマーを使用するために、現在の数値を取得し、1 を追加して、それを 2 乗するメソッドを作成しましょう。

Observable.just(1).compose(new Expand(new Func1<Integer, Integer>() {

  @Override
  public Integer call(final Integer input) {
    final Integer output = input + 1;
    return output * output;
  }

});

いずれにせよ、おそらく、このアプローチの主な厄介な点のいくつかに気付いたでしょう。まず、switch と concatMap の関係があり、これが基本的に Observable 出力からの 1 つのアイテムを無限チェーンに変える方法です。第二に、「Void」シグナル Observable 全体は必要ないはずです。確かに、またはその他の多くのものを使用できますrangejust(1).repeat()、それでも最終的には捨てられます。

これをよりきれいに再帰的にモデル化する方法を次に示します。

public static <T> Observable<T> expandObservable(
    final T initialValue, final Func1<T, T> expandFunc) {
  return Observable.just(initialValue)
      .concatWith(Observable.defer(new Func0<Observable<T>>() {

        @Override
        public Observable<T> call() {
          return expandObservable(expandFunc.call(initialValue), expandFunc);
        }

      });
}

したがって、この例では、各再帰パスが現在の値を出力します (各ステップで展開され、次のステップと連結されます)。deferこれは、無限再帰がすぐに発生しないようにするために使用されます。これを使用すると、次のようになります。

expandObservable(1, new Func1<Integer, Integer>() {

  @Override
  public Integer call(final Integer input) {
    final Integer output = input + 1;
    return output * output;
  }

}).subscribe(/** do whatever */);

そのため、例によく似ていcomposeますが、より整然としたクリーンな実装です。

それが役立つことを願っています。

于 2015-01-26T00:23:42.467 に答える
-1

私はあなたが探していると思いますmap

https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#map

numbers = Observable.from([1, 2, 3, 4, 5]);

numbers.map({it * it}).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);

1
4
9
16
25
Sequence complete
于 2014-11-14T12:05:44.393 に答える