4

これは、見逃した非常に単純な拡張メソッドである可能性があると感じていますが、見つけることができません...

基本的に、新しい値ごとに値がゆっくりと増加するストリームを生成するストリームを取得したいと考えています。これを時間ではなく「許容範囲」で調整/サンプリングしたい。例えば

var ob = Enumerable.Range(0, 30).ToObservable(); // 0, 1, 2, 3, 4, 5,....., 30
var largeMovingOb = ob.WhenChangedBy(10); // 0, 10, 20, 30

[1, 4, 20, 33] などのシーケンスがあり、値が最後の値の 15 を超えて変更されたときに出力したい場合 - [1, 20] になります。値 12 による変更は、[1, 20, 33] になります。

このための組み込みの Rx 拡張機能はありますか? 理想的には、それぞれのオーバーロードを記述することなく、すべての数値型で機能します。

4

4 に答える 4

9

これにはぴったりだと思いますObservable.Scan

var ob = Enumerable.Range(0, 30).ToObservable();
var largeMovingOb = ob.Scan((acc, i) => acc + 10 > i ? acc : i)
  .DistinctUntilChanged();
于 2012-11-22T00:22:14.933 に答える
0

あなたが望むことをする組み込みの演算子があります。

これを試して:

var ob = Observable.Range(0, 30);
var largeMovingOb = ob.DistinctUntilChanged(x => x / 10);

署名は次のようになるため、数値型だけでなく、任意の型で機能します。

IObservable<TSource> DistinctUntilChanged<TSource, TKey>(
    this IObservable<TSource> source, Func<TSource, TKey> keySelector)

単純。

于 2012-11-22T00:43:15.337 に答える
0

再利用できるもう 1 つの組み込み演算子はDistinctUntilChangedで、最後の値を追跡します。ここでの最大の「ハック」は、IEqualityComparerが標準的な平等の期待に従わない可能性があることです (関数によってa == b && b == cは意味しません)。a == c

public static IObservable<T> DistinctUntilChangedBy<T>(
    this IObservable<T> source, Func<T, T, bool> isChanged)
{
    //check arguments
    return source.DistinctUntilChanged(new MarginEqualityComparer<T>(isChanged));
}

class MarginEqualityComparer<T> : IEqualityComparer<T>
{
    MarginEqualityComparer(Func<T, T, bool> comparer)
    {
        _comparer = comparer;
    }

    private readonly Func<T, T, bool> _comparer;

    public bool Equals(T x, T y)
    {
        return _comparer(x, y);
    }

    public int GetHashCode(T obj)
    {
        throw new NotSupportedException("This comparer does not support hashing.");
    }
}
于 2012-11-21T18:07:05.610 に答える
-1

Where拡張メソッドonを使用してIObservable<T>、最後に生成したものを追跡し、値が最後に生成された値を許容レベルだけ超えた場合にのみ、述語が true を返すようにすることができます。

これは、次のように、クロージャーを利用してこれを行う拡張メソッドにラップできます。

public static IObservable<int> WhenLastObservedChangesByMoreThan(
    this IObservable<int> observable, int tolerance)
{
    // Validate parameters.
    if (observable == null) throw new ArgumentNullException("observable");

    // Tolerance must be positive, so comparisons are correct after
    // addition/subtraction.
    if (tolerance < 0) 
        throw new ArgumentOutOfRangeExeption("tolerance", tolerance,
            "The tolerance parameter must be a non-negative number.");

    // Shortcut: If tolerance is 0, then every value is returned, just
    // return the observable.
    if (tolerance == 0) return observable;

    // The last value yielded.
    int? lastYielded = null;

    // Filter.
    observable = observable.Where(i => {
        // If there is a previous value
        // that was yielded.
        if (lastYielded != null)
        {
            // Is the last value within
            // tolerance?
            if (i - tolerance < i && i < i + tolerance)
            {
                // Do not process.
                return false;
            }
        }

        // This is being yielded, store the value.
        lastYielded = i;

        // Yield the value.
        return true;
    });
}

上記はスレッドセーフではないことに注意してください。複数のスレッドからIObservable<T>呼び出している場合は、変数OnNextへのアクセスをロックする必要があります (ステートメントで行うのは簡単です)。lastYieldedlock

于 2012-11-21T17:28:22.407 に答える