6

たくさんのイベントが入ってきて、それらすべてを損失なく実行する必要がありますが、それらが適切なタイムスロットでバッファリングされ、消費されるようにしたいと考えています。誰にも解決策がありますか?

イベントを失うことなくそれを行うことができる Rx のオペレーターを見つけることができません (スロットル - イベントを失います)。Buffered、Delayなども検討しました...良い解決策が見つかりません。

真ん中にタイマーを入れようとしましたが、どういうわけかまったく機能しません:

GetInitSequence()
            .IntervalThrottle(TimeSpan.FromSeconds(5))
            .Subscribe(
                item =>
                    {
                        Console.WriteLine(DateTime.Now);
                        // Process item
                    }
            );

public static IObservable<T> IntervalThrottle<T>(this IObservable<T> source, TimeSpan dueTime)
    {
        return Observable.Create<T>(o =>
            {
                return source.Subscribe(x =>
                    {
                        new Timer(state => 
                            o.OnNext((T)state), x, dueTime, TimeSpan.FromMilliseconds(-1));
                    }, o.OnError, o.OnCompleted);
        });
    }
4

4 に答える 4

13

質問は100%明確ではないので、いくつかの推測をしています。

Observable.Delay処理のために均等な時間間隔を作成するのではなく、各イベントが到着したときから遅延が発生するため、これは望ましくありません。

Observable.Buffer一度に1つではなく、指定された間隔ごとにすべてのイベントが渡されるため、これは望ましくありません。

ですから、あなたは、時を刻むある種のメトロノームを作成し、刻みごとにイベントを提供するソリューションを探していると思います。これは、メトロノームを使用してソースに接続するために単純に構築できます。Observable.IntervalZip

var source = GetInitSequence();
var trigger = Observable.Interval(TimeSpan.FromSeconds(5));    
var triggeredSource = source.Zip(trigger, (s,_) => s); 
triggeredSource.Subscribe(item => Console.WriteLine(DateTime.Now));

これは 5 秒ごとにトリガーされ (上の例では)、元のアイテムが順番に表示されます。

このソリューションの唯一の問題は、ソース要素が (たとえば) 10 秒間それ以上ない場合、ソース要素が到着するとすぐに送信されることです。 . そのシナリオのマーブル ダイアグラム:

source:  -a-b-c----------------------d-e-f-g
trigger: ----o----o----o----o----o----o----o
result:  ----a----b----c-------------d-e-f-g

これは非常に合理的な問題です。ここには、すでにそれに取り組む2つの質問があります。

イベントのバーストをスムーズにする Rx IObservable バッファリング

バッファリングされたイベントを等間隔でプッシュする方法

提供されるソリューションは、メインのDrain拡張メソッドとセカンダリのBuffered拡張です。これらをはるかに単純になるように変更しました ( は不要でDrain、 を使用するだけConcatです)。使用法は次のとおりです。

var bufferedSource = source.StepInterval(TimeSpan.FromSeconds(5));

拡張方法StepInterval:

public static IObservable<T> StepInterval<T>(this IObservable<T> source, TimeSpan minDelay)
{
    return source.Select(x => 
        Observable.Empty<T>()
            .Delay(minDelay)
            .StartWith(x)
    ).Concat();
}
于 2012-07-01T21:55:07.900 に答える
1

これは単純すぎるかもしれませんが、これでうまくいくでしょうか?

var intervaled = source.Do(x => { Thread.Sleep(100); });

基本的に、これは値の間に最小限の遅延を置くだけです。単純すぎる?

于 2012-07-02T01:23:31.857 に答える
0

Observable.Bufferはどうですか?これにより、1 秒ウィンドウ内のすべてのイベントが 1 つのイベントとして返されます。

var xs = Observable.Interval(TimeSpan.FromMilliseconds(100));
var bufferdStream = xs.Buffer(TimeSpan.FromSeconds(5));
bufferdStream.Subscribe(item => { Console.WriteLine("Number of events in window: {0}", item.Count); });

あなたが求めているのはそれほど明確ではないかもしれません。あなたのコードは何をすべきですか?イベントごとにタイマーを作成して遅延しているようです。また、次の完了が次の前に発生する可能性があるため、オブザーバブルのセマンティクスが壊れます。

これは、使用されるタイマーでのみ正確であることに注意してください。通常、タイマーの精度は最大で 16 ミリ秒です。

編集:

あなたの例は次のようになり、アイテムにはウィンドウ内のすべてのイベントが含まれます。

GetInitSequence()
            .Buffer(TimeSpan.FromSeconds(5))
            .Subscribe(
                item =>
                    {
                        Console.WriteLine(DateTime.Now);
                        // Process item
                    }
            );
于 2012-07-01T18:03:44.667 に答える