データベースに保存する必要があるデータを生成するシミュレーションがあります。
ParallelLoopResult res = Parallel.For(0, 1000000, options, (r, state) =>
{
ComplexDataSet cds = GenerateData(r);
SaveDataToDatabase(cds);
});
シミュレーションでは大量のデータが生成されるため、最初にデータを生成してからデータベースに保存するのは現実的ではなく (最大 1 GB のデータ)、データベースに 1 つずつ保存するのも意味がありません。 (実用的であるには小さすぎるトランザクション)。それらを制御されたサイズのバッチ挿入としてデータベースに挿入したい(1回のコミットで100など)。
ただし、並列コンピューティングに関する私の知識は理論的なものではないと思います。私はこれを思いつきました(ご覧のとおり、非常に欠陥があります):
DataBuffer buffer = new DataBuffer(...);
ParallelLoopResult res = Parallel.For(0, 10000000, options, (r, state) =>
{
ComplexDataSet cds = GenerateData(r);
buffer.SaveDataToBuffer(cds, i == r - 1);
});
public class DataBuffer
{
int count = 0;
int limit = 100
object _locker = new object();
ConcurrentQueue<ConcurrentBag<ComplexDataSet>> ComplexDataBagQueue{ get; set; }
public void SaveDataToBuffer(ComplexDataSet data, bool isfinalcycle)
{
lock (_locker)
{
if(count >= limit)
{
ConcurrentBag<ComplexDataSet> dequeueRef;
if(ComplexDataBagQueue.TryDequeue(out dequeueRef))
{
Commit(dequeueRef);
}
_lastItemRef = new ConcurrentBag<ComplexDataSet>{data};
ComplexDataSetsQueue.Enqueue(_lastItemRef);
count = 1;
}
else
{
// First time
if(_lastItemRef == null)
{
_lastItemRef = new ConcurrentBag<ComplexDataSet>{data};
ComplexDataSetsQueue.Enqueue(_lastItemRef);
count = 1;
}
// If buffer isn't full
else
{
_lastItemRef.Add(data);
count++;
}
}
if(isfinalcycle)
{
// Commit everything that hasn't been committed yet
ConcurrentBag<ComplexDataSet> dequeueRef;
while (ComplexDataSetsQueue.TryDequeue(out dequeueRef))
{
Commit(dequeueRef);
}
}
}
}
public void Commit(ConcurrentBag<ComplexDataSet> data)
{
// Commit data to database..should this be somehow in another thread or something ?
}
}
ご覧のとおり、キューを使用してバッファーを作成し、コミットするタイミングを手動で決定しています。ただし、これは私の問題に対する解決策としてはあまり効果的ではないと強く感じています。まず、ロックを正しく行っているかどうかわかりません。第二に、これが完全にスレッドセーフであるかどうか(またはまったく)もわかりません。
ちょっと見て、どうすればいいのかコメントしてもらえますか?または、これを行うための完全に優れた方法がある場合 (ある種の生産者と消費者の手法または何かを使用) ?
感謝と最高の願い、D.