4

C# で Reactive Extensions (Rx) を使用しており、次の方法でイベントをフィルター処理したいと考えています。次のオリジネーター シーケンスがあるとします。

ABCDEF X GHI X J XX KLMNO X P

そして、次の出力を生成したいと思います。

EF X HI X J XX NO X

基本的に、最大境界 (この例ではこの境界は 2) でイベントをバッファリング (スロットル?) し、特定のイベント (この場合はイベント X) を取得すると、そのバッファを出力にフラッシュしてバッファリングを開始します。特別なイベントをもう一度見るまで。

私はいくつかのアプローチを試みていますが、運がありません。私が見逃していることを達成する簡単な方法があるはずだと思います。

編集: 1 つの制約は、破棄される大量のイベントと、X の数個のインスタンスのみを取得することを期待しているため、最後の 2 (または 20) のみを読み取るために数千のイベントを含むバッファーをメモリに保持することは、実際にはそうではありません。オプション。

4

3 に答える 3

1

ここに投稿した別の回答に便乗します: Rxでのスライディングウィンドウの実装の問題

重要な点は、この拡張メソッドです。

public static class Ext
{
    public static IObservable<IList<T>> SlidingWindow<T>(
        this IObservable<T> src, 
        int windowSize)
    {
        var feed = src.Publish().RefCount();    
        // (skip 0) + (skip 1) + (skip 2) + ... + (skip nth) => return as list  
        return Observable.Zip(
        Enumerable.Range(0, windowSize)
            .Select(skip => feed.Skip(skip))
            .ToArray());
    }
}

このように使用できるもの:

void Main()
{
    // A faked up source
    var source = new Subject<char>();

    var bufferSize = 2;
    Func<char, bool> eventTrigger = c => c == 'X';

    var query = source
        .Publish()
        .RefCount()
        // Want one extra slot to detect the "event"
        .SlidingWindow(bufferSize + 1)
        .Where(window => eventTrigger(window.Last()))
        .Select(buffer => buffer.ToObservable())
        .Switch();

    using(query.Subscribe(Console.WriteLine))
    {
        source.OnNext('A');
        source.OnNext('B');
        source.OnNext('C');
        source.OnNext('D');
        source.OnNext('E');
        source.OnNext('F');
        source.OnNext('X');
        source.OnNext('G');
        source.OnNext('H');
        source.OnNext('I');
        source.OnNext('X');
        Console.ReadLine();
    }    
}

出力:

E
F
X
H
I
X
于 2013-03-13T15:43:01.970 に答える
1

便宜上、次の 2 つの拡張関数が必要です。

public static class Extensions
{
    public static IObservable<IList<TSource>> BufferUntil<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)
    {
        var published = source.Publish().RefCount();
        return published.Buffer(() => published.Where(predicate));
    }

    public static IEnumerable<TSource> TakeLast<TSource>(this IEnumerable<TSource> source, int count)
    {
        return source.Reverse().Take(count).Reverse();
    }
}

次に、次のように問題を解決します。

source.BufferUntil(c => c == 'X')
    .SelectMany(list => list.TakeLast(3))

出力:

E F X H I X J X X N O X
于 2013-03-13T14:24:44.547 に答える
1

これは私自身の質問に答えるための刺し傷です。問題がある場合はお知らせください。

public static class ObservableHelper
{
    /// <summary>
    /// Buffers entries that do no satisfy the <paramref name="shouldFlush"/> condition, using a circular buffer with a max
    /// capacity. When an entry that satisfies the condition ocurrs, then it flushes the circular buffer and the new entry,
    /// and starts buffering again.
    /// </summary>
    /// <typeparam name="T">The type of entry.</typeparam>
    /// <param name="stream">The original stream of events.</param>
    /// <param name="shouldFlush">The condition that defines whether the item and the buffered entries are flushed.</param>
    /// <param name="bufferSize">The buffer size for accumulated entries.</param>
    /// <returns>An observable that has this filtering capability.</returns>
    public static IObservable<T> FlushOnTrigger<T>(this IObservable<T> stream, Func<T, bool> shouldFlush, int bufferSize)
    {
        if (stream == null) throw new ArgumentNullException("stream");
        if (shouldFlush == null) throw new ArgumentNullException("shouldFlush");
        if (bufferSize < 1) throw new ArgumentOutOfRangeException("bufferSize");

        return System.Reactive.Linq.Observable.Create<T>(observer =>
        {
            var buffer = new CircularBuffer<T>(bufferSize);
            var subscription = stream.Subscribe(
                newItem =>
                    {
                        bool result;
                        try
                        {
                            result = shouldFlush(newItem);
                        }
                        catch (Exception ex)
                        {
                            return;
                        }

                        if (result)
                        {
                            foreach (var buffered in buffer.TakeAll())
                            {
                                observer.OnNext(buffered);
                            }

                            observer.OnNext(newItem);
                        }
                        else
                        {
                            buffer.Add(newItem);
                        }
                    },
                observer.OnError,
                observer.OnCompleted);

            return subscription;
        });
    }
}

ちなみに、CircularBuffer はそのままでは存在しませんが、実装は簡単です。

次に、次のように呼び出します。

        data
            .FlushOnTrigger(item => item == 'X', bufferSize: 2)
            .Subscribe(Console.WriteLine);
于 2013-03-13T21:37:08.113 に答える