4

私はObservable毎秒ティックを発する を持っています:

Observable.interval(0, 1, TimeUnit.SECONDS)
    .take(durationInSeconds + 1));

この Observable を一時停止して数値の発行を停止し、必要に応じて再開したいと思います。

いくつかの落とし穴があります:

  • ObservableJavadoc によると、intervalオペレーターはバックプレッシャーをサポートしていません
  • backpressure に関するRxJava wiki には、 backpressureのフロー制御の代替手段としての Callstack ブロッキングに関するセクションがあります。

生産性の高い Observable を処理するもう 1 つの方法は、コールスタックをブロックすることです (生産性の高い Observable を管理するスレッドを停止します)。これには、Rx の「リアクティブ」でノンブロッキング モデルに反するという欠点があります。ただし、問題のある Observable が安全にブロックできるスレッド上にある場合、これは実行可能なオプションになる可能性があります。現在、RxJava はこれを容易にするためのオペレーターを公開していません。

intervalObservableを一時停止する方法はありますか? または、バックプレッシャーをサポートする独自の「刻む」Observableを実装する必要がありますか?

4

1 に答える 1

8

これには多くの方法があります。たとえば、interval()ブール値フラグ「一時停止」とカウンターの 2 つの追加状態を引き続き使用および維持できます。

public static final Observable<Long> pausableInterval(
  final AtomicBoolean paused, long initial, long interval, TimeUnit unit, Scheduler scheduler) {

  final AtomicLong counter = new AtomicLong();
  return Observable.interval(initial, interval, unit, scheduler)
      .filter(tick -> !paused.get())
      .map(tick -> counter.getAndIncrement()); 
}

次に、一時停止/再開する場所で paused.set(true/false) を呼び出すだけです

編集 2016-06-04

上記の解決策には少し問題があります。監視可能なインスタンスを複数回再利用すると、最後の登録解除時の値から開始されます。例えば:

Observable<Long> o = pausableInterval(...)
List<Long> list1 = o.take(5).toList().toBlocking().single();
List<Long> list2 = o.take(5).toList().toBlocking().single();

list1 は [0,1,2,3,4] と予想されますが、list2 は実際には [5,6,7,8,9] になります。上記の動作が望ましくない場合は、オブザーバブルをステートレスにする必要があります。これは、scan() 演算子によって実現できます。改訂版は次のようになります。

  public static final Observable<Long> pausableInterval(final AtomicBoolean pause, final long initialDelay, 
      final long period, TimeUnit unit, Scheduler scheduler) {

    return Observable.interval(initialDelay, period, unit, scheduler)
        .filter(tick->!pause.get())
        .scan((acc,tick)->acc + 1);
  }

または、Java 8 とラムダに依存したくない場合は、Java 6+ 互換コードを使用して次のようにすることができます。

https://github.com/ybayk/rxjava-recipes/blob/v0.0.2/src/main/java/yurgis/rxjava/recipes/RxRecipes.java#L343-L361

于 2016-03-04T20:03:30.993 に答える