6

オブザーバブルを (さまざまな方法で) 作成し、関係者に返しますが、彼らが聞き終わったら、リソースを消費し続けないようにオブザーバブルを破棄したいと考えています。これは、pub サブシステムでトピックを作成することと考える別の方法です。誰もトピックにサブスクライブしていない場合、トピックとそのフィルタリングを保持したくありません。

4

2 に答える 2

10

Rxには、ニーズに合わせた演算子がすでにあります。実際には2つPublishですRefCount

それらの使用方法は次のとおりです。

IObservable xs = ...

var rxs = xs.Publish().RefCount();

var sub1 = rxs.Subscribe(x => { });
var sub2 = rxs.Subscribe(x => { });

//later
sub1.Dispose();

//later 
sub2.Dispose();

//The underlying subscription to `xs` is now disposed of.

単純。

于 2011-09-22T06:06:28.177 に答える
1

あなたの質問を理解したら、すべてのサブスクライバーがサブスクリプションを破棄したとき、つまりサブスクライバーがなくなったときにオブザーバブルを作成したい場合は、オブザーバブルのさらなる値の生成を停止するクリーンアップ機能を実行したいと考えています。これがあなたが望むものなら、あなたは以下のようなことをすることができます:

//Wrap a disposable
public class WrapDisposable : IDisposable
    {
        IDisposable disp;
        Action act;
        public WrapDisposable(IDisposable _disp, Action _act)
        {
            disp = _disp;
            act = _act;
        }
        void IDisposable.Dispose()
        {
            act();
            disp.Dispose();
        }
    }

    //Observable that we want to clean up after all subs are done
    public static IObservable<long> GenerateObs(out Action cleanup)
    {
        cleanup = () =>
        {
            Console.WriteLine("All subscribers are done. Do clean up");
        };
        return Observable.Interval(TimeSpan.FromSeconds(1));
    }
    //Wrap the observable
    public static IObservable<T> WrapToClean<T>(IObservable<T> obs, Action onAllDone)
    {
        int count = 0;
        return Observable.CreateWithDisposable<T>(ob =>
        {
            var disp = obs.Subscribe(ob);
            Interlocked.Increment(ref count);
            return new WrapDisposable(disp,() =>
            {
                if (Interlocked.Decrement(ref count) == 0)
                {
                    onAllDone();                                                
                }
            });
        });
    }

//使用例:

Action cleanup;
var obs = GenerateObs(out cleanup);
var newObs = WrapToClean(obs, cleanup);
newObs.Take(6).Subscribe(Console.WriteLine);
newObs.Take(5).Subscribe(Console.WriteLine);
于 2011-09-22T05:28:11.557 に答える