10

レポートの目的で時間ベースのイベントを集約するために使用されるC#(.NET 4.5)アプリケーションを作成しています。クエリロジックをリアルタイムデータと履歴データの両方で再利用できるようにするために、Reactive Extensions(2.0)とそのISchedulerインフラストラクチャ(HistoricalSchedulerおよびその仲間)を利用します。

たとえば、イベントのリストを作成するとします(時系列で並べ替えられますが、一致する場合があります!)。その唯一のペイロードはタイムスタンプであり、固定期間のバッファー間での分布を知りたい場合です。

const int num = 100000;
const int dist = 10;

var events = new List<DateTimeOffset>();
var curr = DateTimeOffset.Now;
var gap = new Random();

var time = new HistoricalScheduler(curr);

for (int i = 0; i < num; i++)
{
    events.Add(curr);
    curr += TimeSpan.FromMilliseconds(gap.Next(dist));
}

var stream = Observable.Generate<int, DateTimeOffset>(
    0,
    s => s < events.Count,
    s => s + 1,
    s => events[s],
    s => events[s],
    time);

stream.Buffer(TimeSpan.FromMilliseconds(num), time)
    .Subscribe(l => Console.WriteLine(time.Now + ": " + l.Count));

time.AdvanceBy(TimeSpan.FromMilliseconds(num * dist));

このコードを実行するSystem.StackOverflowExceptionと、次のスタックトレースが生成されます(最後の3行が最後まで続きます)。

mscorlib.dll!System.Threading.Interlocked.Exchange<System.IDisposable>(ref System.IDisposable location1, System.IDisposable value) + 0x3d bytes    
System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x37 bytes    
System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem<System.DateTimeOffset>.Cancel() + 0x23 bytes    
...
System.Reactive.Core.dll!System.Reactive.Disposables.AnonymousDisposable.Dispose() + 0x4d bytes    
System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x4f bytes    
System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem<System.DateTimeOffset>.Cancel() + 0x23 bytes    
...

Observable.Generate()わかりました。問題は、リストのサイズ()に応じて、またnumスケジューラーの選択に関係なく、私の使用に起因しているようです。

私は何が間違っているのですか?または、より一般的には、独自のタイムスタンプを提供するイベントのIObservableからを作成するための好ましい方法は何でしょうか。IEnumerable

4

2 に答える 2

4

(更新-代替手段を提供していないことに気づきました:回答の下部を参照してください)

問題は、どのように機能するかです。これは、引数に基づいてコアカーシブObservable.Generate(再帰が裏返されたと考えてください) ジェネレーターを展開するために使用されます。これらの引数が非常にネストされたコアカーシブ ジェネレーターを生成することになる場合は、スタックを吹き飛ばします。

この時点から、私は多くのことを推測しています (私の前に Rx ソースはありません) (以下を参照)。

initial_state =>
generate_next(initial_state) => 
generate_next(generate_next(initial_state)) => 
generate_next(generate_next(generate_next(initial_state))) =>
generate_next(generate_next(generate_next(generate_next(initial_state)))) => ...

そして、コール スタックがオーバーフローするほど大きくなるまで、何度も何度も繰り返します。たとえば、メソッド シグネチャ + int カウンターでは、再帰呼び出しごとに 8 ~ 16 バイトのようなものになります (ステート マシン ジェネレーターの実装方法によってはさらに多くなります)。深さ)

編集: ソースをプルアップ - 確認: Generate の "Run" メソッドは次のようになります - へのネストされた呼び出しに注意してくださいGenerate:

protected override IDisposable Run(
    IObserver<TResult> observer, 
    IDisposable cancel, 
    Action<IDisposable> setSink)
{
    if (this._timeSelectorA != null)
    {
        Generate<TState, TResult>.α α = 
                new Generate<TState, TResult>.α(
                     (Generate<TState, TResult>) this, 
                     observer, 
                     cancel);
        setSink(α);
        return α.Run();
    }
    if (this._timeSelectorR != null)
    {
        Generate<TState, TResult>.δ δ = 
               new Generate<TState, TResult>.δ(
                   (Generate<TState, TResult>) this, 
                   observer, 
                   cancel);
        setSink(δ);
        return δ.Run();
    }
    Generate<TState, TResult>._ _ = 
             new Generate<TState, TResult>._(
                  (Generate<TState, TResult>) this, 
                  observer, 
                  cancel);
    setSink(_);
    return _.Run();
}

編集:Derp、代替手段を提供しませんでした...これはうまくいくかもしれません:

(編集:固定Enumerable.Rangeなので、ストリームサイズは倍になりませんchunkSize

const int num = 160000;
const int dist = 10;

var events = new List<DateTimeOffset>();
var curr = DateTimeOffset.Now;
var gap = new Random();
var time = new HistoricalScheduler(curr);

for (int i = 0; i < num; i++)
{
    events.Add(curr);
    curr += TimeSpan.FromMilliseconds(gap.Next(dist));
}

    // Size too big? Fine, we'll chunk it up!
const int chunkSize = 10000;
var numberOfChunks = events.Count / chunkSize;

    // Generate a whole mess of streams based on start/end indices
var streams = 
    from chunkIndex in Enumerable.Range(0, (int)Math.Ceiling((double)events.Count / chunkSize) - 1)
    let startIdx = chunkIndex * chunkSize
    let endIdx = Math.Min(events.Count, startIdx + chunkSize)
    select Observable.Generate<int, DateTimeOffset>(
        startIdx,
        s => s < endIdx,
        s => s + 1,
        s => events[s],
        s => events[s],
        time);

    // E pluribus streamum
var stream = Observable.Concat(streams);

stream.Buffer(TimeSpan.FromMilliseconds(num), time)
    .Subscribe(l => Console.WriteLine(time.Now + ": " + l.Count));

time.AdvanceBy(TimeSpan.FromMilliseconds(num * dist));
于 2012-11-20T21:54:50.493 に答える
3

OK、状態遷移としてラムダ式を必要としない別のファクトリ メソッドを使用しましたが、スタック オーバーフローはもう見られません。これが私の質問に対する正しい答えと見なされるかどうかはまだわかりませんが、うまくいくので、ここで共有したいと思いました。

var stream = Observable.Create<DateTimeOffset>(o =>
    {
        foreach (var e in events)
        {
            time.Schedule(e, () => o.OnNext(e));
        }

        time.Schedule(events[events.Count - 1], () => o.OnCompleted());

        return Disposable.Empty;
    });

サブスクリプションを返す (!) 前にイベントを手動でスケジュールするのは面倒に思えますが、この場合はラムダ式内で実行できます。

このアプローチに問題がある場合は、修正してください。System.Reactiveまた、元のコードでどのような暗黙の仮定に違反したかを聞いていただければ幸いです。

(ああ、もっと早くチェックしておくべきでした。RX v1.0 では、オリジナルObservable.Generate()は実際に動作しているようです。)

于 2012-11-20T21:13:17.310 に答える