35

Reactive Extensionsを使用して、Subscribeメソッドの実行中に発生するイベントストリームからのメッセージを無視したいと思います。つまり、メッセージ間の時間よりもメッセージの処理に時間がかかることがあるので、処理する時間がないメッセージを削除したいと思います。

ただし、Subscribeメソッドが完了したときにメッセージが届いた場合は、最後のメッセージを処理したいと思います。そのため、私は常に最新のメッセージを処理します。

したがって、次のようなコードがある場合:

messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);

また、「100」の処理に長い時間がかかると仮定した場合。次に、「100」が完了したときに「2」を処理する必要があります。「1」は、「100」がまだ処理されている間に「2」に置き換えられたため、無視する必要があります。

これは、バックグラウンドタスクを使用して必要な結果の例です。Latest()

var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));

Task.Factory.StartNew(() =>
{
    foreach(var n in messages.Latest())
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    }
});

ただし、Latest()はブロッキング呼び出しであり、このような次の値を待機するスレッドを持たないようにします(メッセージ間に非常に長いギャップがある場合があります)。

次のように、 TPLデータフローBroadcastBlockからを使用して必要な結果を取得することもできます。

var buffer = new BroadcastBlock<long>(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));

buffer.AsObservable()
    .Subscribe(n =>
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    });

しかし、これはRxで直接可能になるはずだと感じています。それを行うための最良の方法は何ですか?

4

9 に答える 9

9

これは、Daveの方法に似ていますが、Sample代わりに使用する方法です(これは、バッファーよりも適切です)。Daveの回答に追加したものと同様の拡張メソッドを含めました。

拡張機能:

public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action)
{
    var sampler = new Subject<Unit>();

    var sub = source.
        Sample(sampler).
        ObserveOn(Scheduler.ThreadPool).
        Subscribe(l =>
        {
            action(l);
            sampler.OnNext(Unit.Default);
        });

    // start sampling when we have a first value
    source.Take(1).Subscribe(_ => sampler.OnNext(Unit.Default));

    return sub;
}

これはより単純であり、起動される「空の」バッファがないことに注意してください。アクションに送信される最初の要素は、実際にはストリーム自体から取得されます。

使用法は簡単です:

messages.SubscribeWithoutOverlap(n =>
{
    Console.WriteLine("start: " + n);
    Thread.Sleep(500);
    Console.WriteLine("end: " + n);
});

messages.Subscribe(x => Console.WriteLine("source: " + x)); // for testing

そして結果:

source: 0
start: 0
source: 1
source: 2
source: 3
source: 4
source: 5
end: 0
start: 5
source: 6
source: 7
source: 8
source: 9
source: 10
end: 5
start: 10
source: 11
source: 12
source: 13
source: 14
source: 15
end: 10
于 2012-06-14T05:53:24.930 に答える
3

これは「ちょうど」Rxを使用する試みです。タイマーとサブスクライバーはスレッドプールを監視することで独立した状態に保たれ、私はサブジェクトを使用してタスクの完了に関するフィードバックを提供しました。

これは簡単な解決策ではないと思いますが、改善のアイデアが得られることを願っています。

messages.
    Buffer(() => feedback).
    Select(l => l.LastOrDefault()).
    ObserveOn(Scheduler.ThreadPool).
    Subscribe(n =>
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
        feedback.OnNext(Unit.Default);
    });

feedback.OnNext(Unit.Default);

わずかな問題が1つあります。バッファが空になると最初に閉じられるため、デフォルト値が生成されます。最初のメッセージの後にフィードバックを行うことで、おそらくそれを解決できます。


これは拡張機能としてです:

public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action)
{
    var feedback = new Subject<Unit>();

    var sub = source.
        Buffer(() => feedback).
        ObserveOn(Scheduler.ThreadPool).
        Subscribe(l =>
        {
            action(l.LastOrDefault());
            feedback.OnNext(Unit.Default);
        });

    feedback.OnNext(Unit.Default);

    return sub;
}

そして使用法:

    messages.SubscribeWithoutOverlap(n =>
    {
        Thread.Sleep(1000);
        Console.WriteLine(n);
    });
于 2012-06-13T23:00:48.530 に答える
3

Observable.Switchを使用した例。また、タスクを完了したがキューに何もない場合も処理します。

using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;

namespace System.Reactive
{
    public static class RXX
    {
        public static IDisposable SubscribeWithoutOverlap<T>
        ( this IObservable<T> source
        , Action<T> action
        , IScheduler scheduler = null)
        {
            var sampler = new Subject<Unit>();
            scheduler = scheduler ?? Scheduler.Default;
            var p = source.Publish();
            var connection = p.Connect();

            var subscription = sampler.Select(x=>p.Take(1))
                .Switch()
                .ObserveOn(scheduler)
                .Subscribe(l =>
                {
                    action(l);
                    sampler.OnNext(Unit.Default);
                });

            sampler.OnNext(Unit.Default);

            return new CompositeDisposable(connection, subscription);
        }
    }
}
于 2013-04-08T10:12:11.647 に答える
3

Lee Campbell( Intro To Rxで有名)のおかげで、この拡張メソッドを使用した実用的なソリューションができました。

public static IObservable<T> ObserveLatestOn<T>(this IObservable<T> source, IScheduler scheduler)
{
    return Observable.Create<T>(observer =>
    {
        Notification<T> outsideNotification = null;
        var gate = new object();
        bool active = false;
        var cancelable = new MultipleAssignmentDisposable();
        var disposable = source.Materialize().Subscribe(thisNotification =>
        {
            bool alreadyActive;
            lock (gate)
            {
                alreadyActive = active;
                active = true;
                outsideNotification = thisNotification;
            }

            if (!alreadyActive)
            {
                cancelable.Disposable = scheduler.Schedule(self =>
                {
                    Notification<T> localNotification = null;
                    lock (gate)
                    {
                        localNotification = outsideNotification;
                        outsideNotification = null;
                    }
                    localNotification.Accept(observer);
                    bool hasPendingNotification = false;
                    lock (gate)
                    {
                        hasPendingNotification = active = (outsideNotification != null);
                    }
                    if (hasPendingNotification)
                    {
                        self();
                    }
                });
            }
        });
        return new CompositeDisposable(disposable, cancelable);
    });
}
于 2013-04-18T09:24:39.703 に答える
3

ロックの代わりにCASを使用し、再帰を回避するソリューションを使用して、これに関するブログ投稿を作成しました。コードは以下のとおりですが、完全な説明はここにあります:http ://www.zerobugbuild.com/?p=192

public static IObservable<TSource> ObserveLatestOn<TSource>(
    this IObservable<TSource> source,
    IScheduler scheduler)
{
    return Observable.Create<TSource>(observer =>
    {
        Notification<TSource> pendingNotification = null;
        var cancelable = new MultipleAssignmentDisposable();

        var sourceSubscription = source.Materialize()
            .Subscribe(notification =>
            {
                var previousNotification = Interlocked.Exchange(
                    ref pendingNotification, notification);

                if (previousNotification != null) return;

                cancelable.Disposable = scheduler.Schedule(() =>
                    {
                        var notificationToSend = Interlocked.Exchange(
                            ref pendingNotification, null);
                        notificationToSend.Accept(observer);
                    });
            });
            return new CompositeDisposable(sourceSubscription, cancelable);
    });
}
于 2013-05-19T18:47:28.123 に答える
1

Taskこれは、サブジェクトを使用しないキャンセルセマンティクスを使用したベースの実装です。disposeを呼び出すと、必要に応じて、サブスクライブされたアクションが処理をキャンセルできます。

    public static IDisposable SampleSubscribe<T>(this IObservable<T> observable, Action<T, CancellationToken> action)
    {
        var cancellation = new CancellationDisposable();
        var token = cancellation.Token;
        Task task = null;

        return new CompositeDisposable(
            cancellation,
            observable.Subscribe(value =>
            {
                if (task == null || task.IsCompleted)
                    task = Task.Factory.StartNew(() => action(value, token), token);
            })
        );
    }

簡単なテストは次のとおりです。

Observable.Interval(TimeSpan.FromMilliseconds(150))
                      .SampleSubscribe((v, ct) =>
                      {   
                          //cbeck for cancellation, do work
                          for (int i = 0; i < 10 && !ct.IsCancellationRequested; i++)
                              Thread.Sleep(100);

                          Console.WriteLine(v);
                      });

出力:

0
7
14
21
28
35
于 2012-06-14T20:24:45.137 に答える
1

Rx 2.0 RCを使用Chunkifyすると、リストのIEnumerableを取得できます。各リストには、最後のMoveNext以降に観察されたものが含まれています。

次に、それToObservableをIObservableに変換し直して、空でない各リストの最後のエントリにのみ注意を払うことができます。

var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));

messages.Chunkify()
        .ToObservable(Scheduler.TaskPool)
        .Where(list => list.Any())
        .Select(list => list.Last())
        .Subscribe(n =>
        {
          Thread.Sleep(TimeSpan.FromMilliseconds(250));
          Console.WriteLine(n);
        });
于 2012-08-09T15:50:56.053 に答える
1

この問題に対する私自身の解決策が完成したばかりです(そしてすでに完全に改訂されています)。これを本番環境で使用する予定です。

スケジューラーが現在のスレッドを使用しない限り、ソースからの、への呼び出しはすぐOnNextに返されます。オブザーバーが以前の通知でビジー状態の場合、オブザーバーは指定可能な最大サイズのキューに入り、そこから以前の通知が処理されるたびに通知されます。キューがいっぱいになると、最新のアイテムは破棄されます。したがって、最大キューサイズ0は、オブザーバーがビジー状態のときに着信するすべてのアイテムを無視します。サイズが1の場合、常に最新のアイテムを観察できます。それが生産者に追いつくまで消費者を忙しく保つためのサイズアップ。OnCompletedOnErrorint.MaxValue

スケジューラーが長時間実行をサポートしている場合(つまり、独自のスレッドを提供している場合)、オブザーバーに通知するループをスケジュールします。それ以外の場合は、再帰的スケジューリングを使用します。

これがコードです。コメントをいただければ幸いです。

partial class MoreObservables
{
    /// <summary>
    /// Avoids backpressure by enqueuing items when the <paramref name="source"/> produces them more rapidly than the observer can process.
    /// </summary>
    /// <param name="source">The source sequence.</param>
    /// <param name="maxQueueSize">Maximum queue size. If the queue gets full, less recent items are discarded from the queue.</param>
    /// <param name="scheduler">Optional, default: <see cref="Scheduler.Default"/>: <see cref="IScheduler"/> on which to observe notifications.</param>
    /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
    /// <exception cref="ArgumentOutOfRangeException"><paramref name="maxQueueSize"/> is negative.</exception>
    /// <remarks>
    /// A <paramref name="maxQueueSize"/> of 0 observes items only if the subscriber is ready.
    /// A <paramref name="maxQueueSize"/> of 1 guarantees to observe the last item in the sequence, if any.
    /// To observe the whole source sequence, specify <see cref="int.MaxValue"/>.
    /// </remarks>
    public static IObservable<TSource> Latest<TSource>(this IObservable<TSource> source, int maxQueueSize, IScheduler scheduler = null)
    {
        if (source == null) throw new ArgumentNullException(nameof(source));
        if (maxQueueSize < 0) throw new ArgumentOutOfRangeException(nameof(maxQueueSize));
        if (scheduler == null) scheduler = Scheduler.Default;

        return Observable.Create<TSource>(observer => LatestImpl<TSource>.Subscribe(source, maxQueueSize, scheduler, observer));
    }

    private static class LatestImpl<TSource>
    {
        public static IDisposable Subscribe(IObservable<TSource> source, int maxQueueSize, IScheduler scheduler, IObserver<TSource> observer)
        {
            if (observer == null) throw new ArgumentNullException(nameof(observer));

            var longrunningScheduler = scheduler.AsLongRunning();
            if (longrunningScheduler != null)
                return new LoopSubscription(source, maxQueueSize, longrunningScheduler, observer);

            return new RecursiveSubscription(source, maxQueueSize, scheduler, observer);
        }

        #region Subscriptions

        /// <summary>
        /// Represents a subscription to <see cref="Latest{TSource}(IObservable{TSource}, int, IScheduler)"/> which notifies in a loop.
        /// </summary>
        private sealed class LoopSubscription : IDisposable
        {
            private enum State
            {
                Idle, // nothing to notify
                Head, // next notification is in _head
                Queue, // next notifications are in _queue, followed by _completion
                Disposed, // disposed
            }

            private readonly SingleAssignmentDisposable _subscription = new SingleAssignmentDisposable();
            private readonly IObserver<TSource> _observer;
            private State _state;
            private TSource _head; // item in front of the queue
            private IQueue _queue; // queued items
            private Notification<TSource> _completion; // completion notification

            public LoopSubscription(IObservable<TSource> source, int maxQueueSize, ISchedulerLongRunning scheduler, IObserver<TSource> observer)
            {
                _observer = observer;
                _queue = Queue.Create(maxQueueSize);
                scheduler.ScheduleLongRunning(_ => Loop());
                _subscription.Disposable = source.Subscribe(
                    OnNext,
                    error => OnCompletion(Notification.CreateOnError<TSource>(error)),
                    () => OnCompletion(Notification.CreateOnCompleted<TSource>()));
            }

            private void OnNext(TSource value)
            {
                lock (_subscription)
                {
                    switch (_state)
                    {
                        case State.Idle:
                            _head = value;
                            _state = State.Head;
                            Monitor.Pulse(_subscription);
                            break;
                        case State.Head:
                        case State.Queue:
                            if (_completion != null) return;
                            try { _queue.Enqueue(value); }
                            catch (Exception error) // probably OutOfMemoryException
                            {
                                _completion = Notification.CreateOnError<TSource>(error);
                                _subscription.Dispose();
                            }
                            break;
                    }
                }
            }

            private void OnCompletion(Notification<TSource> completion)
            {
                lock (_subscription)
                {
                    switch (_state)
                    {
                        case State.Idle:
                            _completion = completion;
                            _state = State.Queue;
                            Monitor.Pulse(_subscription);
                            _subscription.Dispose();
                            break;
                        case State.Head:
                        case State.Queue:
                            if (_completion != null) return;
                            _completion = completion;
                            _subscription.Dispose();
                            break;
                    }
                }
            }

            public void Dispose()
            {
                lock (_subscription)
                {
                    if (_state == State.Disposed) return;

                    _head = default(TSource);
                    _queue = null;
                    _completion = null;
                    _state = State.Disposed;
                    Monitor.Pulse(_subscription);
                    _subscription.Dispose();
                }
            }

            private void Loop()
            {
                try
                {
                    while (true) // overall loop for all notifications
                    {
                        // next notification to emit
                        Notification<TSource> completion;
                        TSource next; // iff completion == null

                        lock (_subscription)
                        {
                            while (true)
                            {
                                while (_state == State.Idle)
                                    Monitor.Wait(_subscription);

                                if (_state == State.Head)
                                {
                                    completion = null;
                                    next = _head;
                                    _head = default(TSource);
                                    _state = State.Queue;
                                    break;
                                }
                                if (_state == State.Queue)
                                {
                                    if (!_queue.IsEmpty)
                                    {
                                        completion = null;
                                        next = _queue.Dequeue(); // assumption: this never throws
                                        break;
                                    }
                                    if (_completion != null)
                                    {
                                        completion = _completion;
                                        next = default(TSource);
                                        break;
                                    }
                                    _state = State.Idle;
                                    continue;
                                }
                                Debug.Assert(_state == State.Disposed);
                                return;
                            }
                        }

                        if (completion != null)
                        {
                            completion.Accept(_observer);
                            return;
                        }
                        _observer.OnNext(next);
                    }
                }
                finally { Dispose(); }
            }
        }

        /// <summary>
        /// Represents a subscription to <see cref="Latest{TSource}(IObservable{TSource}, int, IScheduler)"/> which notifies recursively.
        /// </summary>
        private sealed class RecursiveSubscription : IDisposable
        {
            private enum State
            {
                Idle, // nothing to notify
                Scheduled, // emitter scheduled or executing
                Disposed, // disposed
            }

            private readonly SingleAssignmentDisposable _subscription = new SingleAssignmentDisposable();
            private readonly MultipleAssignmentDisposable _emitter = new MultipleAssignmentDisposable(); // scheduled emit action
            private readonly IScheduler _scheduler;
            private readonly IObserver<TSource> _observer;
            private State _state;
            private IQueue _queue; // queued items
            private Notification<TSource> _completion; // completion notification

            public RecursiveSubscription(IObservable<TSource> source, int maxQueueSize, IScheduler scheduler, IObserver<TSource> observer)
            {
                _scheduler = scheduler;
                _observer = observer;
                _queue = Queue.Create(maxQueueSize);
                _subscription.Disposable = source.Subscribe(
                    OnNext,
                    error => OnCompletion(Notification.CreateOnError<TSource>(error)),
                    () => OnCompletion(Notification.CreateOnCompleted<TSource>()));
            }

            private void OnNext(TSource value)
            {
                lock (_subscription)
                {
                    switch (_state)
                    {
                        case State.Idle:
                            _emitter.Disposable = _scheduler.Schedule(value, EmitNext);
                            _state = State.Scheduled;
                            break;
                        case State.Scheduled:
                            if (_completion != null) return;
                            try { _queue.Enqueue(value); }
                            catch (Exception error) // probably OutOfMemoryException
                            {
                                _completion = Notification.CreateOnError<TSource>(error);
                                _subscription.Dispose();
                            }
                            break;
                    }
                }
            }

            private void OnCompletion(Notification<TSource> completion)
            {
                lock (_subscription)
                {
                    switch (_state)
                    {
                        case State.Idle:
                            _completion = completion;
                            _emitter.Disposable = _scheduler.Schedule(() => EmitCompletion(completion));
                            _state = State.Scheduled;
                            _subscription.Dispose();
                            break;
                        case State.Scheduled:
                            if (_completion != null) return;
                            _completion = completion;
                            _subscription.Dispose();
                            break;
                    }
                }
            }

            public void Dispose()
            {
                lock (_subscription)
                {
                    if (_state == State.Disposed) return;

                    _emitter.Dispose();
                    _queue = null;
                    _completion = null;
                    _state = State.Disposed;
                    _subscription.Dispose();
                }
            }

            private void EmitNext(TSource value, Action<TSource> self)
            {
                try { _observer.OnNext(value); }
                catch { Dispose(); return; }

                lock (_subscription)
                {
                    if (_state == State.Disposed) return;
                    Debug.Assert(_state == State.Scheduled);
                    if (!_queue.IsEmpty)
                        self(_queue.Dequeue());
                    else if (_completion != null)
                        _emitter.Disposable = _scheduler.Schedule(() => EmitCompletion(_completion));
                    else
                        _state = State.Idle;
                }
            }

            private void EmitCompletion(Notification<TSource> completion)
            {
                try { completion.Accept(_observer); }
                finally { Dispose(); }
            }
        }

        #endregion

        #region IQueue

        /// <summary>
        /// FIFO queue that discards least recent items if size limit is reached.
        /// </summary>
        private interface IQueue
        {
            bool IsEmpty { get; }
            void Enqueue(TSource item);
            TSource Dequeue();
        }

        /// <summary>
        /// <see cref="IQueue"/> implementations.
        /// </summary>
        private static class Queue
        {
            public static IQueue Create(int maxSize)
            {
                switch (maxSize)
                {
                    case 0: return Zero.Instance;
                    case 1: return new One();
                    default: return new Many(maxSize);
                }
            }

            private sealed class Zero : IQueue
            {
                // ReSharper disable once StaticMemberInGenericType
                public static Zero Instance { get; } = new Zero();
                private Zero() { }

                public bool IsEmpty => true;
                public void Enqueue(TSource item) { }
                public TSource Dequeue() { throw new InvalidOperationException(); }
            }

            private sealed class One : IQueue
            {
                private TSource _item;

                public bool IsEmpty { get; private set; } = true;

                public void Enqueue(TSource item)
                {
                    _item = item;
                    IsEmpty = false;
                }

                public TSource Dequeue()
                {
                    if (IsEmpty) throw new InvalidOperationException();

                    var item = _item;
                    _item = default(TSource);
                    IsEmpty = true;
                    return item;
                }
            }

            private sealed class Many : IQueue
            {
                private readonly int _maxSize, _initialSize;
                private int _deq, _enq; // indices of deque and enqueu positions
                private TSource[] _buffer;

                public Many(int maxSize)
                {
                    if (maxSize < 2) throw new ArgumentOutOfRangeException(nameof(maxSize));

                    _maxSize = maxSize;
                    if (maxSize == int.MaxValue)
                        _initialSize = 4;
                    else
                    {
                        // choose an initial size that won't get us too close to maxSize when doubling
                        _initialSize = maxSize;
                        while (_initialSize >= 7)
                            _initialSize = (_initialSize + 1) / 2;
                    }
                }

                public bool IsEmpty { get; private set; } = true;

                public void Enqueue(TSource item)
                {
                    if (IsEmpty)
                    {
                        if (_buffer == null) _buffer = new TSource[_initialSize];
                        _buffer[0] = item;
                        _deq = 0;
                        _enq = 1;
                        IsEmpty = false;
                        return;
                    }
                    if (_deq == _enq) // full
                    {
                        if (_buffer.Length == _maxSize) // overwrite least recent
                        {
                            _buffer[_enq] = item;
                            if (++_enq == _buffer.Length) _enq = 0;
                            _deq = _enq;
                            return;
                        }

                        // increse buffer size
                        var newSize = _buffer.Length >= _maxSize / 2 ? _maxSize : 2 * _buffer.Length;
                        var newBuffer = new TSource[newSize];
                        var count = _buffer.Length - _deq;
                        Array.Copy(_buffer, _deq, newBuffer, 0, count);
                        Array.Copy(_buffer, 0, newBuffer, count, _deq);
                        _deq = 0;
                        _enq = _buffer.Length;
                        _buffer = newBuffer;
                    }
                    _buffer[_enq] = item;
                    if (++_enq == _buffer.Length) _enq = 0;
                }

                public TSource Dequeue()
                {
                    if (IsEmpty) throw new InvalidOperationException();

                    var result = ReadAndClear(ref _buffer[_deq]);
                    if (++_deq == _buffer.Length) _deq = 0;
                    if (_deq == _enq)
                    {
                        IsEmpty = true;
                        if (_buffer.Length > _initialSize) _buffer = null;
                    }
                    return result;
                }

                private static TSource ReadAndClear(ref TSource item)
                {
                    var result = item;
                    item = default(TSource);
                    return result;
                }
            }
        }

        #endregion
    }
}
于 2016-10-15T12:03:36.117 に答える
0

さらに別の解決策。

これは、とが混在Taskしているため、きれいではありませんObservable。したがって、を使用して実際にテストすることはできませんReactiveTest(正直なところ、どちらかで「遅い」サブスクライバーを実装する方法はわかりませんReactiveTest)。

public static IObservable<T> ShedLoad<T>(this IObservable<T> source)
{
    return Observable.Create<T>(observer =>
    {
        Task task = Task.FromResult(0);
        return source.Subscribe(t =>
        {
            if(task.IsCompleted)
                task = Task.Run(() => observer.OnNext(t));
            else
                Debug.WriteLine("Skip, task not finished");
        }, observer.OnError, observer.OnCompleted);
    });
}

そこには競合状態があるのではないかと思いますが、速すぎて捨てる段階なら、多すぎても少なすぎてもかまいません。ああ、そしてそれぞれOnNextが(潜在的に)異なるスレッドで呼び出されます(私はSynchronizeの後ろに置くことができると思いますCreate)。

マテリアライズ拡張機能を正しく機能させることができなかったことを認めます(私はそれをに接続してからFromEventPattern(MouseMove)、意図的に遅いサブスクライブでサブスクライブしました、そして奇妙なことに、一度に1つではなく、イベントのバーストを通過させました)

于 2018-09-13T06:34:01.487 に答える