大きなデータセットを処理し、結果のデータを RavenDb にインポートするコードの並列化を行う良い方法を見つけようとしています。
データ処理は CPU バウンドであり、データベース インポート IO バウンドです。
Environment.ProcessorCount スレッド数で並列処理を行うソリューションを探しています。結果のデータは、上記のプロセスと並行して、x (10 としましょう) のプールされたスレッドで RavenDb にインポートする必要があります。
ここでの主なことは、インポートが完了するのを待っている間に次のデータセットの処理が続行されるように、完了したデータがインポートされている間に処理を続行することです。
もう 1 つの問題は、プライベート ワーキング メモリが 5GB を超える可能性があるため、インポートが成功した後に各バッチのメモリを破棄する必要があることです。
以下のコードは、これまでに得たものです。上記の並列化要件を満たさないことに注意してください。
datasupplier.GetDataItems()
.Partition(batchSize)
.AsParallel()
.WithDegreeOfParallelism(Environment.ProcessorCount)
.ForAll(batch =>
{
Task.Run(() =>
{
...
}
}
GetDataItem は、バッチ データセットに分割された列挙可能なデータ項目を生成します。GetDataItem は、約 2,000,000 個のアイテムを生成し、それぞれ平均約 0.3 ミリ秒の処理を行います。
このプロジェクトは、x64 プラットフォーム上の最新の .NET 4.5 RC で実行されています。
アップデート。
私の現在のコード (上記参照) は、アイテムを取得し、それらをバッチで分割します。各バッチは、8 つのスレッド (i7 では Environment.ProcessorCount) で並行して処理されます。処理は遅く、CPU バウンドで、メモリを大量に消費します。1 つのバッチの処理が完了すると、結果のデータを RavenDb に非同期にインポートするタスクが開始されます。バッチ インポート ジョブ自体は同期的で、次のようになります。
using (var session = Store.OpenSession())
{
foreach (var data in batch)
{
session.Store(data);
}
session.SaveChanges();
}
このアプローチにはいくつかの問題があります。
バッチが完了するたびに、インポート ジョブを実行するタスクが開始されます。並行して実行するタスクの数を制限したい (例: 最大 10)。さらに、多くのタスクが開始されていても、それらが並行して実行されることはないようです。
メモリ割り当ては大きな問題です。バッチが処理/インポートされると、まだメモリに残っているようです。
上記の問題を解決する方法を探しています。理想的には私が欲しい:
- 負荷の高いデータのバッチ処理を行う論理プロセッサごとに 1 つのスレッド。
- 完了したバッチが RavenDb にインポートされるのを待っている 10 ほどの並列スレッド。
- メモリの割り当てを最小限に抑えるため、インポート タスクの完了後にバッチの割り当てを解除します。
- バッチ処理用のスレッドの 1 つでインポート ジョブを実行しない。完了したバッチのインポートは、処理中の次のバッチと並行して実行する必要があります。
解決
var batchSize = 10000;
var bc = new BlockingCollection<List<Data>>();
var importTask = Task.Run(() =>
{
bc.GetConsumingEnumerable()
.AsParallel()
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.ForAll(batch =>
{
using (var session = Store.OpenSession())
{
foreach (var i in batch) session.Store(i);
session.SaveChanges();
}
});
});
var processTask = Task.Run(() =>
{
datasupplier.GetDataItems()
.Partition(batchSize)
.AsParallel()
.WithDegreeOfParallelism(Environment.ProcessorCount)
.ForAll(batch =>
{
bc.Add(batch.Select(i => new Data()
{
...
}).ToList());
});
});
processTask.Wait();
bc.CompleteAdding();
importTask.Wait();