1

今週初めに Stackoverflow でいくつかの助けを得て、処理をロードし、大きなデータセットを RavenDb にインポートするためのプロデューサー/コンシューマー パターンを進めました。 IO バウンドで継続する CPU バウンド タスクの並列化

現在、メモリ消費を管理するために、プロデューサーが事前に準備する作業単位の量を調整しようとしています。基本的なセマフォを使用してスロットリングを実装しましたが、特定の時点で実装がデッドロックするという問題が発生しています。

何がデッドロックを引き起こしているのかわかりません。以下はコードの抜粋です。

private static void LoadData<TParsedData, TData>(IDataLoader<TParsedData> dataLoader, int batchSize, Action<IndexedBatch<TData>> importProceedure, Func<IEnumerable<TParsedData>, List<TData>> processProceedure)
    where TParsedData : class
    where TData : class
{
    Console.WriteLine(@"Loading {0}...", typeof(TData).ToString());

    var batchCounter = 0;

    var ist = Stopwatch.StartNew();

    var throttler = new SemaphoreSlim(10);
    var bc = new BlockingCollection<IndexedBatch<TData>>();
    var importTask = Task.Run(() =>
    {
        bc.GetConsumingEnumerable()
            .AsParallel()
            .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
            //or
            //.WithDegreeOfParallelism(1)
            .WithMergeOptions(ParallelMergeOptions.NotBuffered)
            .ForAll(data =>
            {
                var st = Stopwatch.StartNew();
                importProceedure(data);

                Console.WriteLine(@"Batch imported {0} in {1} ms", data.Index, st.ElapsedMilliseconds);
                throttler.Release();
            });
    });
    var processTask = Task.Run(() =>
    {
        dataLoader.GetParsedItems()
            .Partition(batchSize)
            .AsParallel()
            .WithDegreeOfParallelism(Environment.ProcessorCount)
            //or
            //.WithDegreeOfParallelism(1)
            .WithMergeOptions(ParallelMergeOptions.NotBuffered)
            .ForAll(batch =>
            {
                throttler.Wait(); //.WaitAsync()
                var batchno = ++batchCounter;
                var st = Stopwatch.StartNew();

                bc.Add(new IndexedBatch<TData>(batchno, processProceedure(batch)));

                Console.WriteLine(@"Batch processed {0} in {1} ms", batchno, st.ElapsedMilliseconds);
            });
    });

    processTask.Wait();
    bc.CompleteAdding();
    importTask.Wait();

    Console.WriteLine(nl(1) + @"Loading {0} completed in {1} ms", typeof(TData).ToString(), ist.ElapsedMilliseconds);
}

public class IndexedBatch<TBatch> 
    where TBatch : class
{
    public IndexedBatch(int index, List<TBatch> batch)
    {
        Index = index;
        Batch = batch ?? new List<TBatch>();
    }

    public int Index { get; set; }
    public List<TBatch> Batch { get; set; }
}

これは、LoadData に対して行われる呼び出しです。

LoadData<DataBase, Data>(
    DataLoaderFactory.Create<DataBase>(datafilePath),
    1024,
    (data) =>
    {
        using (var session = Store.OpenSession())
        {
            foreach (var i in data.Batch)
            {
                session.Store(i);
                d.TryAdd(i.LongId.GetHashCode(), int.Parse(i.Id.Substring(i.Id.LastIndexOf('/') + 1)));
            }
            session.SaveChanges();
        }
    },
    (batch) =>
    {
        return batch.Select(i => new Data()
        {
            ...
        }).ToList();
    }
);

Store は RavenDb IDocumentStore です。DataLoaderFactory は、指定されたデータセットのカスタム パーサーを構築します。

4

2 に答える 2

1

「ここにブロックがあります!」という大きな矢印がなければ、デッドロックをデバッグするのは困難です。デバッガーなしでコードのデバッグを回避する: BlockingCollection は既に調整できます。引数を取るコンストラクターを使用してint boundedCapacity、セマフォを削除します。デッドロックを解決する非常に高いオッズ。

于 2012-07-29T17:52:19.587 に答える
1

スレッド数を確認できますか?ブロッキングが原因でスレッドプールを使い果たした可能性があります。TPL は、スレッドがProcessorCountないとコードがデッドロックすると考える場合よりも多くのスレッドを挿入します。ただし、一定の制限までしか実行できません。

とにかく、組み込みのヒューリスティックはノンブロッキングのもので最もよく機能するため、TPLタスク内でブロックすることは一般的に悪い考えです。

于 2012-07-29T23:10:32.967 に答える