7

株価の流れを表すオブザーバブルがあります。監視可能なシーケンスにオブザーバーがいない場合、価格のストリームを提供しているリモート サーバーから切断できるようにしたいのですが、すべてのオブザーバーが Dispose() を呼び出すまではそうしたくありません。次に、同様の方法で、最初の人が Subscribe を呼び出したときに、リモート サーバーに再接続したいと思います。

オブザーバブルでサブスクライブを呼び出したオブザーバーの数を把握する方法はありますか? それとも、オブザーバーがいつ Subscribe または Dispose を呼び出しているかを知る方法でしょうか?

4

4 に答える 4

10

私は単純に RefCount / Publish を使用します。IObservable を実装していると、私は一生懸命働きすぎているようにいつも感じます。

myColdObservable.Publish().RefCount();

これにより、全員が切断された後、観察可能な停止パルスが発生します。サンプルは次のとおりです。

var coldObservable = Observable
    .Interval(TimeSpan.FromSeconds(1))
    .ObserveOn(Scheduler.TaskPool)
    .Select(_ => DoSomething());

var refCountObs = coldObservable.Publish().RefCount();

CompositeDisposable d = new CompositeDisposable();
d.Add(refCountObs.Subscribe(n => Console.WriteLine("First got: " + n)));
d.Add(refCountObs.Subscribe(n => Console.WriteLine("Second got: " + n)));
d.Add(refCountObs.Subscribe(n => Console.WriteLine("Third got: " + n)));

//Wait a bit for work to happen
System.Threading.Thread.Sleep(10000);

//Everyone unsubscribes
d.Dispose();

//Observe that DoSomething is not called.
System.Threading.Thread.Sleep(3000);

実際に登録者数を知りたい場合はこれに該当しませんが、登録者がいない場合は業務を停止するという要件には合致すると思います。

于 2012-05-31T19:25:25.223 に答える
6

少し古いものですが、サブスクライバーの数を知る必要があるという問題があったため、この投稿に出くわしました. Bart の提案を使用して、この拡張機能を思いつきました。

public static IObservable<T> CountSubscribers<T>(this IObservable<T> source, Action<int> countChanged)
{
 int count = 0;

 return Observable.Defer(() =>
 {
    count = Interlocked.Increment(ref count);
    countChanged(count);
    return source.Finally(() =>
     {
        count = Interlocked.Decrement(ref count);
        countChanged(count);
     });
 });
}
于 2015-06-18T13:36:32.627 に答える
4

一般に、IObservableを実装しないでください。通常、Rxには、直接または構成を通じて役立つものがすでにあります。IObservableを実装する必要がある場合は、Observable.Createを使用して実装し、オブザーバー契約などに必要なすべての保証を取得します。

あなたの問題に関しては-PublishとRefCountを使用するという提案はまさにあなたが探している構成です。何らかの理由で自分自身をカウントしたい場合は、Observable.Deferを使用してサブスクリプションをインターセプトし、場合によってはObservable.Finallyを使用してシーケンスの終了をインターセプトします。または、ソースをObservable.Createでラップし、オブザーバーをラップされたシーケンスに転送し、返されたIDisposableをカウントロジックでラップします(Disposable.Createを使用)。

乾杯、

-バート(Rxチーム)

于 2012-06-02T10:09:08.453 に答える
3

IObservable<T>実装できるインターフェースです。インターフェイスの Subscribe メソッドでは、リストを内部的に維持することでオブザーバーを追跡できます。

次のコード スニペットは MSDN からのものです。

private List<IObserver<Location>> observers;

public IDisposable Subscribe(IObserver<Location> observer) 
{
   if (! observers.Contains(observer)) 
      observers.Add(observer);

   // ------- If observers.Count == 1 create connection. -------

   return new Unsubscriber(observers, observer);
}
private class Unsubscriber : IDisposable
{
   private List<IObserver<Location>>_observers;
   private IObserver<Location> _observer;

   public Unsubscriber(List<IObserver<Location>> observers, IObserver<Location> observer)
   {
      this._observers = observers;
      this._observer = observer;
   }

   public void Dispose()
   {
      if (_observer != null && _observers.Contains(_observer))
         _observers.Remove(_observer);
      // ----------- if observers.Count == 0 close connection -----------
   }
}
于 2012-05-31T18:07:15.103 に答える