34

最初のイベントが受信されたときにデリゲートが呼び出され、その後のイベントが受信された場合は 1 秒間呼び出されないように、イベント ストリームを効果的に調整したいと考えています。そのタイムアウト (1 秒) が経過した後、後続のイベントが受信された場合は、デリゲートが呼び出されるようにします。

Reactive Extensions を使用してこれを行う簡単な方法はありますか?

サンプルコード:

static void Main(string[] args)
{
    Console.WriteLine("Running...");

    var generator = Observable
        .GenerateWithTime(1, x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1)
        .Timestamp();

    var builder = new StringBuilder();

    generator
        .Sample(TimeSpan.FromSeconds(1))
        .Finally(() => Console.WriteLine(builder.ToString()))
        .Subscribe(feed =>
                   builder.AppendLine(string.Format("Observed {0:000}, generated at {1}, observed at {2}",
                                                    feed.Value,
                                                    feed.Timestamp.ToString("mm:ss.fff"),
                                                    DateTime.Now.ToString("mm:ss.fff"))));

    Console.ReadKey();
}

現在の出力:

Running...
Observed 064, generated at 41:43.602, observed at 41:43.602
Observed 100, generated at 41:44.165, observed at 41:44.602

しかし、私は観察したいです(タイムスタンプは明らかに変わります)

Running...
Observed 001, generated at 41:43.602, observed at 41:43.602
....
Observed 100, generated at 41:44.165, observed at 41:44.602
4

8 に答える 8

16

わかった、

ここには 3 つのシナリオがあります。

1) 毎秒イベント ストリームの 1 つの値を取得したいと考えています。つまり、1 秒あたりにより多くのイベントを生成すると、常により大きなバッファーが得られます。

observableStream.Throttle(timeSpan)

2) 2 番目のイベントが発生する前に生成された最新のイベントを取得したい: 他のイベントが破棄される。

observableStream.Sample(TimeSpan.FromSeconds(1))

3) 最後の 1 秒間に発生したすべてのイベントを取得したいと考えています。そしてそれは毎秒

observableStream.BufferWithTime(timeSpan)

4) 秒が経過するまで、すべての値で秒の間に何が起こるかを選択したい場合、結果が返されます

observableStream.CombineLatest(Observable.Interval(1000), selectorOnEachEvent)
于 2010-07-09T09:56:54.390 に答える
14

RXフォーラムの助けを借りて得たものは次のとおりです。

アイデアは、元のシーケンスが起動する一連の「チケット」を発行することです。これらの「チケット」は、チケット シーケンスにすぐに追加される最初のチケットを除いて、タイムアウトのために遅延されます。イベントが発生し、待機中のチケットがある場合、イベントはすぐに発火します。それ以外の場合は、チケットまで待機してから発火します。発火すると、次のチケットが発行されます...

チケットと元のイベントを組み合わせるには、コンビネータが必要です。残念ながら、「標準」の .CombineLatest は、以前に使用されたチケットやイベントで発生するため、ここでは使用できません。そのため、基本的にフィルター処理された .CombineLatest である独自のコンビネーターを作成する必要がありました。これは、組み合わせの両方の要素が「新鮮」である場合にのみ起動し、以前は返されませんでした。私はそれを .CombineVeryLatest 別名 .BrokenZip と呼んでいます;)

.CombineVeryLatest を使用すると、上記のアイデアは次のように実装できます。

    public static IObservable<T> SampleResponsive<T>(
        this IObservable<T> source, TimeSpan delay)
    {
        return source.Publish(src =>
        {
            var fire = new Subject<T>();

            var whenCanFire = fire
                .Select(u => new Unit())
                .Delay(delay)
                .StartWith(new Unit());

            var subscription = src
                .CombineVeryLatest(whenCanFire, (x, flag) => x)
                .Subscribe(fire);

            return fire.Finally(subscription.Dispose);
        });
    }

    public static IObservable<TResult> CombineVeryLatest
        <TLeft, TRight, TResult>(this IObservable<TLeft> leftSource,
        IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector)
    {
        var ls = leftSource.Select(x => new Used<TLeft>(x));
        var rs = rightSource.Select(x => new Used<TRight>(x));
        var cmb = ls.CombineLatest(rs, (x, y) => new { x, y });
        var fltCmb = cmb
            .Where(a => !(a.x.IsUsed || a.y.IsUsed))
            .Do(a => { a.x.IsUsed = true; a.y.IsUsed = true; });
        return fltCmb.Select(a => selector(a.x.Value, a.y.Value));
    }

    private class Used<T>
    {
        internal T Value { get; private set; }
        internal bool IsUsed { get; set; }

        internal Used(T value)
        {
            Value = value;
        }
    }

編集: これは、フォーラムで Andreas Köpf によって提案された CombineVeryLatest の別のよりコンパクトなバリエーションです。

public static IObservable<TResult> CombineVeryLatest
  <TLeft, TRight, TResult>(this IObservable<TLeft> leftSource,
  IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector)
{
    return Observable.Defer(() =>
    {
        int l = -1, r = -1;
        return Observable.CombineLatest(
            leftSource.Select(Tuple.Create<TLeft, int>),
            rightSource.Select(Tuple.Create<TRight, int>),
                (x, y) => new { x, y })
            .Where(t => t.x.Item2 != l && t.y.Item2 != r)
            .Do(t => { l = t.x.Item2; r = t.y.Item2; })
            .Select(t => selector(t.x.Item1, t.y.Item1));
    });
}
于 2010-07-11T21:42:26.560 に答える
8

私は昨夜この同じ問題に苦しんでいましたが、よりエレガントな (または少なくとも短い) 解決策を見つけたと思います:

var delay = Observable.Empty<T>().Delay(TimeSpan.FromSeconds(1));
var throttledSource = source.Take(1).Concat(delay).Repeat();
于 2011-04-29T12:59:34.167 に答える
6

これは、 Rx フォーラムでこの質問への回答として投稿したものです。

更新: これは、1 秒以上の時間差でイベントが発生した場合に、イベントの転送を遅らせなくなった新しいバージョンです。

public static IObservable<T> ThrottleResponsive3<T>(this IObservable<T> source, TimeSpan minInterval)
{
    return Observable.CreateWithDisposable<T>(o =>
    {
        object gate = new Object();
        Notification<T> last = null, lastNonTerminal = null;
        DateTime referenceTime = DateTime.UtcNow - minInterval;
        var delayedReplay = new MutableDisposable();
        return new CompositeDisposable(source.Materialize().Subscribe(x =>
        {
            lock (gate)
            {
                var elapsed = DateTime.UtcNow - referenceTime;
                if (elapsed >= minInterval && delayedReplay.Disposable == null)
                {
                    referenceTime = DateTime.UtcNow;
                    x.Accept(o);
                }
                else
                {
                    if (x.Kind == NotificationKind.OnNext)
                        lastNonTerminal = x;
                    last = x;
                    if (delayedReplay.Disposable == null)
                    {
                        delayedReplay.Disposable = Scheduler.ThreadPool.Schedule(() =>
                        {
                            lock (gate)
                            {
                                referenceTime = DateTime.UtcNow;
                                if (lastNonTerminal != null && lastNonTerminal != last)
                                    lastNonTerminal.Accept(o);
                                last.Accept(o);
                                last = lastNonTerminal = null;
                                delayedReplay.Disposable = null;
                            }
                        }, minInterval - elapsed);
                    }
                }
            }
        }), delayedReplay);
    });
}

これは私の以前の試みでした:

var source = Observable.GenerateWithTime(1, 
    x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1)
    .Timestamp();

source.Publish(o =>
    o.Take(1).Merge(o.Skip(1).Sample(TimeSpan.FromSeconds(1)))
).Run(x => Console.WriteLine(x));
于 2010-07-09T15:23:29.237 に答える
2

わかりました、ここに1つの解決策があります。私は特に好きではありませんが...まあ。

SkipWhile について教えてくれた Jon と、BufferWithTime について cRichter に感謝します。みんなありがとう。

static void Main(string[] args)
{
    Console.WriteLine("Running...");

    var generator = Observable
        .GenerateWithTime(1, x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1)
        .Timestamp();

    var bufferedAtOneSec = generator.BufferWithTime(TimeSpan.FromSeconds(1));

    var action = new Action<Timestamped<int>>(
        feed => Console.WriteLine("Observed {0:000}, generated at {1}, observed at {2}",
                                  feed.Value,
                                  feed.Timestamp.ToString("mm:ss.fff"),
                                  DateTime.Now.ToString("mm:ss.fff")));

    var reactImmediately = true;
    bufferedAtOneSec.Subscribe(list =>
                                   {
                                       if (list.Count == 0)
                                       {
                                           reactImmediately = true;
                                       }
                                       else
                                       {
                                           action(list.Last());
                                       }
                                   });
    generator
        .SkipWhile(item => reactImmediately == false)
        .Subscribe(feed =>
                       {
                           if(reactImmediately)
                           {
                               reactImmediately = false;
                               action(feed);
                           }
                       });

    Console.ReadKey();
}
于 2010-07-09T12:44:20.293 に答える
0

あなたが探しているのは、CombineLatest です。

public static IObservable<TResult> CombineLatest<TLeft, TRight, TResult>(
    IObservable<TLeft> leftSource,
    IObservable<TRight> rightSource,
    Func<TLeft, TRight, TResult> selector
)

セレクター(時間)に値がある場合、2つのオブザーバブルをマージし、すべての値を返します。

編集:ジョンは正しいです、それはおそらく好ましい解決策ではありません

于 2010-07-09T09:03:41.977 に答える
0

Throttle拡張方法を試しましたか?

ドキュメントから:

dueTime の前に別の値が続くオブザーバブル シーケンスの値を無視します。

最初の値ではなく次の値を無視したいという点で、それがあなたが望むことをするかどうかは私にははっきりしていません...しかし、私はそれがあなたが望むものであると期待しています. 試してみる :)

編集: うーん...いいえ、結局のところ、それは正しいことThrottle とは思いません。あなたがやりたいことはわかると思いますが、それを行うためのフレームワークには何も見えません。私は何かを逃したかもしれません。Rxフォーラムで質問しましたか? それが今そこにない場合、彼らはそれを追加して喜んでいるかもしれません:)

狡猾SkipUntilそしてどういうわけか...できるSelectManyと思いますが、それは独自の方法で行うべきだと思います。

于 2010-07-09T09:01:58.460 に答える