0

次のコードがあります。

var observable = ... subscribe to event here ...

var windows = observable.Window(TimeSpan.FromSeconds(240));

aggregatedWindows = windows.SelectMany(
    window => window.Aggregate(new Context(), AggregateContext));

subscription = aggregatedWindows.Subscribe(OnWindow);

... later

subscription.Dispose();

ウィンドウの処理中に誰かが私のアプリを閉じるように要求したシナリオを想像してみてください。このサブスクリプションを破棄すると、処理中のイベントが停止しますが、情報の最後のウィンドウも失われます。

これに対処する最善の方法が何であるかはわかりません...

集計関数を介して渡される最後のウィンドウでローカル状態を保存できます (ただし、これは間違っているようです)...

どんな助けでも大歓迎です!

4

2 に答える 2

0

アグリゲーションのサブスクリプションを保持する代わりに、ウィンドウ処理を操作できます。これは、最後のウィンドウが到着するまで接続を維持し、パーティション分割に時間がかかりすぎる場合に備えて、タイムアウトを使用して切断します。

ここでは別のクラスが使用されています。使用Createすると Auto Detach になるためです。これにより、dispose 呼び出しが行われた後、オブザーバーがすぐに切断されます。基本的に、Dispose の意味は、ここで変更されたものです。

    public static IObservable<T> DeferDisconnection<T>(this IObservable<T> observable, TimeSpan timeout)
    {
        return new ClosingObservable<T>(observable, timeout);
    }


    public class ClosingObservable<T> : IObservable<T>
    {

        private readonly IConnectableObservable<T> Source;
        private readonly IDisposable Subscription;
        private readonly TimeSpan Timeout;

        public ClosingObservable(IObservable<T> observable, TimeSpan timeout)
        {
            Timeout = timeout;
            Source = observable.Publish();
            Subscription = Source.Connect();
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            Source.Subscribe(observer);

            return Disposable.Create(() => Source.Select(_ => new Unit())
                                                 .Amb(Observable.Timer(Timeout).Select(_ => new Unit()))
                                                 .Subscribe(_ => Subscription.Dispose())
                                                 );
        }
    }

テスト:

            var disposable =
            Observable.Interval(TimeSpan.FromSeconds(2))
                      .Do(Console.WriteLine)
                      .DeferDisconnection(TimeSpan.FromSeconds(5))
                      .Subscribe();

            Console.ReadLine();

            disposable.Dispose();

            Console.ReadLine();
于 2012-06-26T22:11:58.720 に答える
-1

最後に部分的なウィンドウが表示されることで確認されるように、これは機能します。

class Program
{
    public class Context
    {
        public int count;
    }

    static Context AggregateContext(Context c, long i)
    {
        c.count++;
        return c;
    }

    static void OnWindow(Context c) { Console.WriteLine(c.count); }

    static void Main(string[] args)
    {
        var canceled = new Subject<bool>();

        var observable = Observable.Interval(TimeSpan.FromSeconds(.1)).TakeUntil(canceled);

        var windows = observable.Window(TimeSpan.FromSeconds(3));

        var aggregatedWindows = windows.SelectMany(
            window => window.Aggregate(new Context(), AggregateContext));

        var subscription = aggregatedWindows.Subscribe(OnWindow);

        Thread.Sleep(TimeSpan.FromSeconds(10));

        canceled.OnNext(true);
        subscription.Dispose();

        Console.WriteLine( @"Output should have been something like 30,30,30,30,10" );
        Console.ReadLine();
    }
} 
于 2012-06-26T22:40:26.563 に答える