3

私は以下のタイプを持っています...

public class NewsFeed
{
    public event EventHandler<NewsItemEventArgs> NewItem;

    .....

}

public class NewsItemEventArgs : EventArgs
{
    public NewsItem Item;
    public NewsItemEventArgs(NewsItem newsItem)
    {
        Item = newsItem;
    }
}

public class NewsItem
{
    public int Id { get; set; }
    public string Title { get; set; }
    public string Body { get; set; }
}

NewsFeedのNewItemイベントは、NewsItemEventArgsタイプのeventArgsでイベントを発生させます。私のシステムでは、イベントはバーストで公開されます。たとえば、小さな1秒のウィンドウに10個のニュースアイテムがあり、その後60秒間はニュース記事がありません。RXを使用してこれらのバーストをスムーズにしたいので、UIニュース記事では、たとえば5秒のより定期的な間隔で一度に1つずつ到着するように見えます。

以下のようなものでオブザーバブルを作成する必要があることを私は知っています

_source = Observable.FromEventPattern<NewsItemEventArgs>(
                        h => _newsFeed.NewItem += h,
                        h => _newsFeed.NewItem -= h);

しかし、上記のバーストで発生するのではなく、イベントにドリップフィードされるように、オブザーバブルを変換してサブスクライブする方法がわかりません。

何か案は?

4

3 に答える 3

3

プロデューサーが時々遅くなり、出力が不安定になる可能性があるため、Zipはこの操作に最適な選択ではない可能性があります。

Rx 2.0では、正確なスケジューリングはDateTimeOffsetまだ不可能のようです。TimeSpanただし、今のところは機能します。TimeSpanオフセットを。に置き換えることで試すことができますDateTimeOffset

要約すると、2つの連続する値の間の最小間隔を指定できれば、バーストの問題を解決できます。

    static IObservable<T> DelayBetweenValues<T>(this IObservable<T> observable, TimeSpan interval, IScheduler scheduler)
    {
        return Observable.Create<T>(observer =>
        {
            var offset = TimeSpan.Zero;
            return observable
                .TimeInterval(scheduler)
                .Subscribe
                (
                    ts =>
                    {
                        if (ts.Interval < interval)
                        {
                            offset = offset.Add(interval);
                            scheduler.Schedule(offset, () => observer.OnNext(ts.Value));
                        }
                        else
                        {
                            offset = TimeSpan.Zero;
                            observer.OnNext(ts.Value);
                        }
                    }
                );
        });
    }

テスト:

        Observable.Interval(TimeSpan.FromSeconds(2.5))
                  .Do(_ => Console.WriteLine("Burst"))
                  .SelectMany(i => Enumerable.Range((int)i, 10))
                  .DelayBetweenValues(TimeSpan.FromSeconds(0.2), TaskPoolScheduler.Default)
                  .Subscribe(Console.WriteLine);
于 2012-09-29T14:56:24.583 に答える
2

シーケンスを、一定の間隔で値を生成する別のシーケンスで圧縮できます。

Observable<NewsItem> nis = _source
    .Zip(Observable.Timer(Timespan.FromSeconds(5), TimeSpan.FromSeconds(5)), (e, _) => e)
    .Select(eventArgs => eventArgs.Item);
于 2012-09-29T10:17:11.623 に答える
1

Astiから仕事への答えを得ることができませんでした。そこで、delayDurationSelectorを使用して文書化されていないDelayオーバーロードを試しました。タイマーでスケジューラーを使用するとどういうわけか正しく動作しませんが、問題がなければ機能します。

public static IObservable<T> DelayBetweenValues<T>(this IObservable<T> observable, TimeSpan interval,
    IScheduler scheduler)
{
        var offset =  TimeSpan.Zero;
        return observable
            .TimeInterval(scheduler)
            .Delay(ti =>
            {
                offset = (ti.Interval < interval) ? offset.Add(interval) : TimeSpan.Zero;
                return Observable.Timer(offset);
            })
            .Select(ti => ti.Value);
}
于 2013-12-17T21:32:10.937 に答える