1

これは、かなり高度な質問に要約されます。大規模なデータセットがあり、各行を (個別に) 検証する必要があります。Parallel.Foreach を使用して、その行で検証メソッドを呼び出したいと考えています。このメソッドはスレッドセーフであると想定してください。検証でエラーが返された場合は、このエラーでデータ行を更新する必要があります。バックグラウンドスレッドからこれを行うことは明らかにできません。ただし、このエラー処理を実装する最善の方法が何であるかはわかりません。これを実装する私の考えは、行 ID とエラーを BlockingCollection に格納することです。これは書き込みに対してスレッドセーフであるためです。次に、(バックグラウンド スレッドから) 常にポーリングし、バックグラウンド スレッドがデータを見つけたら、フォームを呼び出して現在の行を更新します。

Reactive Framework を使用して、これを行う簡単な方法があるかどうか疑問に思っています。基本的に、「監視」できるマルチプロデューサースレッドセーフコレクションが必要で、コレクションに新しい値が追加されると、メインスレッドで「OnNext」が実行されます-これは可能ですか? 理想的には、これが発生する頻度も制御できます (メイン スレッドのコールバックが複数の行を 2 ~ 3 秒ごとに更新するように、2 ~ 3 秒ごとなど)。

御時間ありがとうございます。

4

1 に答える 1

2

これはどのようにあなたの想像力を刺激しますか:

IObservable<bool> ValidateAsync(Row item)
{
    return Observable.Start(() => {
        // TODO: Figure out if the row is valid
        return true;
    }, Scheduler.TaskPoolScheduler);
}

myBigDataTable.ToObservable()
    .Select(x => ValidateAsync(x).Select(y => new { Row = x, IsValid = y }))
    .Merge(10 /* rows concurrently */)
    .ObserveOn(SynchronizationContext.Current /*assuming WinForms */)
    .Subscribe(x => {
        Console.WriteLine("Row {0} validity: {1}", x.Row, x.IsValid);
    });

ロックなし、ばかげたコンテナーなし、ブロッキングなし、100% スレッドセーフ。

于 2012-05-10T18:14:08.370 に答える