17

Throttleメソッドは、他の人があまりにも早く従う場合、観察可能なシーケンスから値をスキップします。しかし、それらを遅らせる方法が必要です。つまり、アイテムをスキップせずに、アイテム間の最小遅延を設定する必要があります

実際の例: リクエストを 1 秒に 1 回しか受け付けない Web サービスがあります。単一またはバッチでリクエストを追加できるユーザーがいます。Rx がなければ、リストとタイマーを作成します。ユーザーがリクエストを追加したら、リストに追加します。タイマー イベントでは、リストが空かどうかを確認します。そうでない場合は、リクエストを送信し、対応するアイテムを削除します。ロックとそのすべてのもので。今、Rx を使用するSubjectと、ユーザーがリクエストを追加したときにアイテムを作成して追加できます。しかし、遅延を適用して Web サービスがフラッディングしないようにする方法が必要です。

私はRxを初めて使用するので、明らかな何かが欠けている可能性があります。

4

6 に答える 6

7

を使用して、必要なことを行うかなり簡単な方法がありますEventLoopScheduler

私は、0 ~ 3 秒ごとにランダムに値を生成するオブザーバブルから始めました。

var rnd = new Random();

var xs =
    Observable
        .Generate(
            0,
            x => x < 20,
            x => x + 1,
            x => x,
            x => TimeSpan.FromSeconds(rnd.NextDouble() * 3.0));

ここで、最後の値が 1 秒以内でない限り、この出力値をすぐに作成するには、次のようにしました。

var ys =
    Observable.Create<int>(o =>
    {
        var els = new EventLoopScheduler();
        return xs
            .ObserveOn(els)
            .Do(x => els.Schedule(() => Thread.Sleep(1000)))
            .Subscribe(o);
    });

これは、 上のソースを効果的に観察し、EventLoopSchedulerその後 1 秒ごとにスリープ状態にするため、ウェイクアップ後にOnNextのみ次のソースを開始できます。OnNext

このコードで動作することをテストしました:

ys
    .Timestamp()
    .Select(x => x.Timestamp.Second + (double)x.Timestamp.Millisecond/1000.0)
    .Subscribe(x => Console.WriteLine(x));

これが役立つことを願っています。

于 2012-07-26T02:28:07.827 に答える
5

簡単な拡張メソッドはどうですか:

public static IObservable<T> StepInterval<T>(this IObservable<T> source, TimeSpan minDelay)
{
    return source.Select(x => 
        Observable.Empty<T>()
            .Delay(minDelay)
            .StartWith(x)
    ).Concat();
}

使用法:

var bufferedSource = source.StepInterval(TimeSpan.FromSeconds(1));
于 2012-07-26T10:51:04.613 に答える
-1

監視可能なタイマーを使用してブロッキング キューから取得するのはどうですか? 以下のコードはテストされていませんが、私が何を意味するかを理解できるはずです...

//assuming somewhere there is 
BlockingCollection<MyWebServiceRequestData> workQueue = ...

Observable
  .Timer(new TimeSpan(0,0,1), new EventLoopScheduler())
  .Do(i => myWebService.Send(workQueue.Take()));

// Then just add items to the queue using workQueue.Add(...)
于 2012-07-25T16:50:26.053 に答える
-1
.Buffer(TimeSpan.FromSeconds(0.2)).Where(i => i.Any())
.Subscribe(buffer => 
{
     foreach(var item in buffer) Console.WriteLine(item)
});
于 2013-05-01T19:08:00.013 に答える