レポートの目的で時間ベースのイベントを集約するために使用されるC#(.NET 4.5)アプリケーションを作成しています。クエリロジックをリアルタイムデータと履歴データの両方で再利用できるようにするために、Reactive Extensions(2.0)とそのIScheduler
インフラストラクチャ(HistoricalScheduler
およびその仲間)を利用します。
たとえば、イベントのリストを作成するとします(時系列で並べ替えられますが、一致する場合があります!)。その唯一のペイロードはタイムスタンプであり、固定期間のバッファー間での分布を知りたい場合です。
const int num = 100000;
const int dist = 10;
var events = new List<DateTimeOffset>();
var curr = DateTimeOffset.Now;
var gap = new Random();
var time = new HistoricalScheduler(curr);
for (int i = 0; i < num; i++)
{
events.Add(curr);
curr += TimeSpan.FromMilliseconds(gap.Next(dist));
}
var stream = Observable.Generate<int, DateTimeOffset>(
0,
s => s < events.Count,
s => s + 1,
s => events[s],
s => events[s],
time);
stream.Buffer(TimeSpan.FromMilliseconds(num), time)
.Subscribe(l => Console.WriteLine(time.Now + ": " + l.Count));
time.AdvanceBy(TimeSpan.FromMilliseconds(num * dist));
このコードを実行するSystem.StackOverflowException
と、次のスタックトレースが生成されます(最後の3行が最後まで続きます)。
mscorlib.dll!System.Threading.Interlocked.Exchange<System.IDisposable>(ref System.IDisposable location1, System.IDisposable value) + 0x3d bytes
System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x37 bytes
System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem<System.DateTimeOffset>.Cancel() + 0x23 bytes
...
System.Reactive.Core.dll!System.Reactive.Disposables.AnonymousDisposable.Dispose() + 0x4d bytes
System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x4f bytes
System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem<System.DateTimeOffset>.Cancel() + 0x23 bytes
...
Observable.Generate()
わかりました。問題は、リストのサイズ()に応じて、またnum
スケジューラーの選択に関係なく、私の使用に起因しているようです。
私は何が間違っているのですか?または、より一般的には、独自のタイムスタンプを提供するイベントのIObservable
からを作成するための好ましい方法は何でしょうか。IEnumerable