13

SlidingWindow移動平均などを簡単に監視したいので、リアクティブエクステンションの演算子を作成しました。簡単な例として、マウスイベントを聞くためにサブスクライブしたいのですが、イベントがあるたびに、最後の3つを受け取りたいです( 3つおきのイベントが最後の3つを受け取るのを待っています)。そのため、私が見つけたウィンドウのオーバーロードでは、箱から出して必要なものが得られないようです。

これが私が思いついたものです。リスト操作が頻繁に行われることを考えると、これが最もパフォーマンスの高いソリューションではない可能性があることを恐れています。

public static IObservable<List<T>> SlidingWindow<T>(this IObservable<T> seq, int length)
{
    var seed = new List<T>();

    Func<List<T>, T, List<T>> accumulator = (list, arg2) =>
    {
        list.Add(arg2);

        if (list.Count > length)
            list.RemoveRange(0, (list.Count - length));

        return list;
    };

    return seq.Scan(seed, accumulator)
                .Where(list => list.Count == length);
}

これは次のように呼び出すことができます。

var rollingSequence = Observable.Range(1, 5).SlidingWindow().ToEnumerable();

しかし、驚いたことに、期待される結果を受け取る代わりに

1,2,3
2,3,4
3,4,5

結果を受け取ります

2,3,4
3,4,5
3,4,5

どんな洞察も大歓迎です!

4

4 に答える 4

16

カウントに3の引数を指定して、元のテストを使用すると、目的の結果が得られます。

public static IObservable<IList<T>> SlidingWindow<T>(
    this IObservable<T> source, int count)
{
    return source.Buffer(count, 1)
                 .Where(list => list.Count == count);
}

このようなテスト:

var source = Observable.Range(1, 5);
var query = source.SlidingWindow(3);
using (query.Subscribe(i => Console.WriteLine(string.Join(",", i))))
{

}

出力:

1,2,3
2,3,4
3,4,5
于 2013-05-17T15:07:08.927 に答える
8

ただsource.Window(count, 1)-またはsource.Buffer(count, 1) それは「カウント」アイテムのウィンドウ/バッファであり、1つずつスライドします。

于 2014-03-05T00:32:49.927 に答える
6

ここでのスライディングウィンドウの実装は、私のスライディングウィンドウのアイデアには不十分でした。最も近いものは使用してBuffer(N, 1)いましたが、最初の結果を出力する前に最初のN個のアイテムを待機してから、シーケンスの最後を超えてスライドするため、問題が発生します。一度に最大N個のアイテムを放出したい。

私はこの実装に行き着きました:

public static IObservable<IList<T>> SlidingWindow<T>(this IObservable<T> obs, int windowSize) =>
    Observable.Create<IList<T>>(observer =>
    {
        var buffer = new CircularBuffer<T>(windowSize);
        return obs.Subscribe(
            value =>
            {
                buffer.Add(value);
                observer.OnNext(buffer.ToList());
            },
            ex => observer.OnError(ex),
            () => observer.OnCompleted()
        );
    });

最初はバッファにキューを使用していましたが、もう少し軽量なものを使用したかったのです。

public class CircularBuffer<T> : IReadOnlyList<T>
{
    private readonly T[] buffer;
    private int offset;
    private int count;
    public CircularBuffer(int bufferSize) => this.buffer = new T[bufferSize];
    public int Capacity => buffer.Length;
    public int Count => count;
    public T this[int index] => index < 0 || index >= count
        ? throw new ArgumentOutOfRangeException(nameof(index))
        : buffer[(offset + index) % buffer.Length];
    public void Add(T value)
    {
        buffer[(offset + count) % buffer.Length] = value;
        if (count < buffer.Length) count++;
        else offset = (offset + 1) % buffer.Length;
    }
    public IEnumerator<T> GetEnumerator()
    {
        for (var i = 0; i < count; i++)
            yield return buffer[(offset + i) % buffer.Length];
    }
    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}

次のシーケンスが生成されObservable.Range(0, 10).SlidingWindow(3)ます:

 0,1,2,3,4,5,6,7,8,9
[0]
[0,1]
[0,1,2]
  [1,2,3]
    [2,3,4]
      [3,4,5]
        [4,5,6]
          [5,6,7]
            [6,7,8]
              [7,8,9]
于 2020-01-24T22:29:14.053 に答える
5

代わりにこれを試してください-座って相対的なパフォーマンスについて考える必要がありますが、少なくとも同じくらい良いと 思われ、読みやすくなっています。

public static IObservable<IList<T>> SlidingWindow<T>(
       this IObservable<T> src, 
       int windowSize)
{
    var feed = src.Publish().RefCount();    
    // (skip 0) + (skip 1) + (skip 2) + ... + (skip nth) => return as list  
    return Observable.Zip(
       Enumerable.Range(0, windowSize)
           .Select(skip => feed.Skip(skip))
           .ToArray());
}

テストリグ:

var source = Observable.Range(0, 10);
var query = source.SlidingWindow(3);
using(query.Subscribe(Console.WriteLine))
{               
    Console.ReadLine();
}

出力:

ListOf(0,1,2)
ListOf(1,2,3)
ListOf(2,3,4)
ListOf(3,4,5)
ListOf(4,5,6)
...

編集:余談ですが、私は.Publish().RefCount()それをしないことによって一度やけどを負ったので、私は自分自身を強制的にしていることに気づきます...私はそれがここで厳密に要求されているとは思いません。

yzorgの編集:

このようにメソッドを拡張すると、実行時の動作がより明確になります。

public static IObservable<IList<T>> SlidingWindow<T>(
    this IObservable<T> src, 
    int windowSize)
{
    var feed = src.Publish().RefCount();    
    // (skip 0) + (skip 1) + (skip 2) + ... + (skip nth) => return as list  
    return Observable.Zip(
    Enumerable.Range(0, windowSize)
        .Select(skip => 
        {
            Console.WriteLine("Skipping {0} els", skip);
            return feed.Skip(skip);
        })
        .ToArray());
}
于 2013-03-06T20:20:08.880 に答える