今週初めに Stackoverflow でいくつかの助けを得て、処理をロードし、大きなデータセットを RavenDb にインポートするためのプロデューサー/コンシューマー パターンを進めました。 IO バウンドで継続する CPU バウンド タスクの並列化
現在、メモリ消費を管理するために、プロデューサーが事前に準備する作業単位の量を調整しようとしています。基本的なセマフォを使用してスロットリングを実装しましたが、特定の時点で実装がデッドロックするという問題が発生しています。
何がデッドロックを引き起こしているのかわかりません。以下はコードの抜粋です。
private static void LoadData<TParsedData, TData>(IDataLoader<TParsedData> dataLoader, int batchSize, Action<IndexedBatch<TData>> importProceedure, Func<IEnumerable<TParsedData>, List<TData>> processProceedure)
where TParsedData : class
where TData : class
{
Console.WriteLine(@"Loading {0}...", typeof(TData).ToString());
var batchCounter = 0;
var ist = Stopwatch.StartNew();
var throttler = new SemaphoreSlim(10);
var bc = new BlockingCollection<IndexedBatch<TData>>();
var importTask = Task.Run(() =>
{
bc.GetConsumingEnumerable()
.AsParallel()
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
//or
//.WithDegreeOfParallelism(1)
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.ForAll(data =>
{
var st = Stopwatch.StartNew();
importProceedure(data);
Console.WriteLine(@"Batch imported {0} in {1} ms", data.Index, st.ElapsedMilliseconds);
throttler.Release();
});
});
var processTask = Task.Run(() =>
{
dataLoader.GetParsedItems()
.Partition(batchSize)
.AsParallel()
.WithDegreeOfParallelism(Environment.ProcessorCount)
//or
//.WithDegreeOfParallelism(1)
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.ForAll(batch =>
{
throttler.Wait(); //.WaitAsync()
var batchno = ++batchCounter;
var st = Stopwatch.StartNew();
bc.Add(new IndexedBatch<TData>(batchno, processProceedure(batch)));
Console.WriteLine(@"Batch processed {0} in {1} ms", batchno, st.ElapsedMilliseconds);
});
});
processTask.Wait();
bc.CompleteAdding();
importTask.Wait();
Console.WriteLine(nl(1) + @"Loading {0} completed in {1} ms", typeof(TData).ToString(), ist.ElapsedMilliseconds);
}
public class IndexedBatch<TBatch>
where TBatch : class
{
public IndexedBatch(int index, List<TBatch> batch)
{
Index = index;
Batch = batch ?? new List<TBatch>();
}
public int Index { get; set; }
public List<TBatch> Batch { get; set; }
}
これは、LoadData に対して行われる呼び出しです。
LoadData<DataBase, Data>(
DataLoaderFactory.Create<DataBase>(datafilePath),
1024,
(data) =>
{
using (var session = Store.OpenSession())
{
foreach (var i in data.Batch)
{
session.Store(i);
d.TryAdd(i.LongId.GetHashCode(), int.Parse(i.Id.Substring(i.Id.LastIndexOf('/') + 1)));
}
session.SaveChanges();
}
},
(batch) =>
{
return batch.Select(i => new Data()
{
...
}).ToList();
}
);
Store は RavenDb IDocumentStore です。DataLoaderFactory は、指定されたデータセットのカスタム パーサーを構築します。