5

一連の株価ティックが入ってきて、過去 1 時間のすべてのデータを取得して処理したいと考えています。リアクティブ拡張機能 2.0 でこれを達成しようとしています。別の投稿で Interval を使用することを読みましたが、それは非推奨だと思います。

4

4 に答える 4

9

この拡張メソッドはあなたの問題を解決しますか?

public static IObservable<T[]> RollingBuffer<T>(
    this IObservable<T> @this,
    TimeSpan buffering)
{
    return Observable.Create<T[]>(o =>
    {
        var list = new LinkedList<Timestamped<T>>();
        return @this.Timestamp().Subscribe(tx =>
        {
            list.AddLast(tx);
            while (list.First.Value.Timestamp < DateTime.Now.Subtract(buffering))
            {
                list.RemoveFirst();
            }
            o.OnNext(list.Select(tx2 => tx2.Value).ToArray());
        }, ex => o.OnError(ex), () => o.OnCompleted());
    });
}
于 2012-07-25T08:54:02.653 に答える
4

ウィンドウオペレーターを探しています!これは、偶然のシーケンス(シーケンスの重複ウィンドウ)の操作について書いた長い記事です http://introtorx.com/Content/v1.0.10621.0/17_SequencesOfCoincidence.html

したがって、ローリング平均を作成したい場合は、この種のコードを使用できます

var scheduler = new TestScheduler();
var notifications = new Recorded<Notification<double>>[30];
for (int i = 0; i < notifications.Length; i++)
{
  notifications[i] = new Recorded<Notification<double>>(i*1000000, Notification.CreateOnNext<double>(i));
}
//Push values into an observable sequence 0.1 seconds apart with values from 0 to 30
var source = scheduler.CreateHotObservable(notifications);

source.GroupJoin(
      source,   //Take values from myself
      _=>Observable.Return(0, scheduler), //Just the first value
      _=>Observable.Timer(TimeSpan.FromSeconds(1), scheduler),//Window period, change to 1hour
      (lhs, rhs)=>rhs.Sum())    //Aggregation you want to do.
    .Subscribe(i=>Console.WriteLine (i));
scheduler.Start();

また、値を受け取ると、ローリング サムが出力されることがわかります。

0、1、3、6、10、15、21、28...

于 2012-07-25T11:14:36.897 に答える
1

おそらくBufferあなたが探しているものです:

var hourlyBatch = ticks.Buffer(TimeSpan.FromHours(1));
于 2012-07-19T15:48:30.150 に答える