3

冷たい例を考えると:

Observable<Integer> cold = Observable.create(subscriber -> {
  try {
    for (int i = 0; i <= 42; i++) {

      // avoid doing unnecessary work
      if (!subscriber.isUnsubscribed()) {
        break;
      }

      subscriber.onNext(i);
    }
    subscriber.onCompleted();
  } catch (Throwable cause) {
    subscriber.onError(cause);
  }
});

新しいサブスクライバーごとにゼロから実行を開始します。

// starts execution
cold.subscribe(...)

サブスクライバーが早期にサブスクライブを解除すると、実行を停止できます。

// stops execution
subscription.unsubscribe();

サンプルの for ループの代わりに、実際のビジネス ロジックが進行している場合 (サブスクライバーごとにリプレイする必要はなく、代わりにリアルタイムである必要があります)、ホット オブザーバブルを扱っています...

PublishSubject<Integer> hot = PublishSubject.create();

Thread thread = new Thread(() -> {
  try {
    for (int i = 0; i < 42; i++) {
      // how to avoid unnecessary work when no one is subscribed?
      hot.onNext(i);
    }
    hot.onCompleted();
  } catch (Throwable cause) {
    hot.onError(cause);
  }
});

開始したいときに実行する可能性があります

// stats work (although no one is subscribed) 
thread.start();

したがって、最初の質問:最初のオブザーバーがサブスクライブしたときにのみ作業を開始するにはどうすればよいですか? (コネクティブルオブザーバブルかな?)

そして重要な質問:最後のサブスクライバーがサブスクライブを解除したときに作業を停止するにはどうすればよいですか? (そのサブジェクトの現在のサブスクリプションにアクセスする方法がわかりません。そのようなソリューションが存在する場合、グローバル状態を共有せずにクリーンなソリューションを見つけたいと思います)

私が考えることができる1つの解決策は、サブスクライバーを管理するカスタムオペレーターで件名を持ち上げることです...

4

1 に答える 1

4

演算子refCountを参照してください- http://reactivex.io/documentation/operators/refcount.html。この Operator はObservableConnectableObservableに変換し、最初のサブスクライバーがサブスクライブするときに接続し、サブスクリプションがなくなると切断します

于 2015-12-25T13:33:58.537 に答える