3

大きなデータセットを処理し、結果のデータを RavenDb にインポートするコードの並列化を行う良い方法を見つけようとしています。

データ処理は CPU バウンドであり、データベース インポート IO バウンドです。

Environment.ProcessorCount スレッド数で並列処理を行うソリューションを探しています。結果のデータは、上記のプロセスと並行して、x (10 としましょう) のプールされたスレッドで RavenDb にインポートする必要があります。

ここでの主なことは、インポートが完了するのを待っている間に次のデータセットの処理が続行されるように、完了したデータがインポートされている間に処理を続行することです。

もう 1 つの問題は、プライベート ワーキング メモリが 5GB を超える可能性があるため、インポートが成功した後に各バッチのメモリを破棄する必要があることです。

以下のコードは、これまでに得たものです。上記の並列化要件を満たさないことに注意してください。

datasupplier.GetDataItems()
    .Partition(batchSize)
    .AsParallel()
    .WithDegreeOfParallelism(Environment.ProcessorCount)
    .ForAll(batch =>
    {
        Task.Run(() =>
        {
            ...
        }
    }

GetDataItem は、バッチ データセットに分割された列挙可能なデータ項目を生成します。GetDataItem は、約 2,000,000 個のアイテムを生成し、それぞれ平均約 0.3 ミリ秒の処理を行います。

このプロジェクトは、x64 プラットフォーム上の最新の .NET 4.5 RC で実行されています。

アップデート。

私の現在のコード (上記参照) は、アイテムを取得し、それらをバッチで分割します。各バッチは、8 つのスレッド (i7 では Environment.ProcessorCount) で並行して処理されます。処理は遅く、CPU バウンドで、メモリを大量に消費します。1 つのバッチの処理が完了すると、結果のデータを RavenDb に非同期にインポートするタスクが開始されます。バッチ インポート ジョブ自体は同期的で、次のようになります。

using (var session = Store.OpenSession())
{
    foreach (var data in batch)
    {
        session.Store(data);
    }
    session.SaveChanges();
}

このアプローチにはいくつかの問題があります。

  1. バッチが完了するたびに、インポート ジョブを実行するタスクが開始されます。並行して実行するタスクの数を制限したい (例: 最大 10)。さらに、多くのタスクが開始されていても、それらが並行して実行されることはないようです。

  2. メモリ割り当ては大きな問題です。バッチが処理/インポートされると、まだメモリに残っているようです。

上記の問題を解決する方法を探しています。理想的には私が欲しい:

  • 負荷の高いデータのバッチ処理を行う論理プロセッサごとに 1 つのスレッド。
  • 完了したバッチが RavenDb にインポートされるのを待っている 10 ほどの並列スレッド。
  • メモリの割り当てを最小限に抑えるため、インポート タスクの完了後にバッチの割り当てを解除します。
  • バッチ処理用のスレッドの 1 つでインポート ジョブを実行しない。完了したバッチのインポートは、処理中の次のバッチと並行して実行する必要があります。

解決

var batchSize = 10000;
var bc = new BlockingCollection<List<Data>>();
var importTask = Task.Run(() =>
{
    bc.GetConsumingEnumerable()
        .AsParallel()
        .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
        .WithMergeOptions(ParallelMergeOptions.NotBuffered)
        .ForAll(batch =>
        {
            using (var session = Store.OpenSession())
            {
                foreach (var i in batch) session.Store(i);
                session.SaveChanges();
            }
        });
});
var processTask = Task.Run(() =>
{
    datasupplier.GetDataItems()
        .Partition(batchSize)
        .AsParallel()
        .WithDegreeOfParallelism(Environment.ProcessorCount)
        .ForAll(batch =>
        {
            bc.Add(batch.Select(i => new Data()
            {
                ...
            }).ToList());
        });
});

processTask.Wait();
bc.CompleteAdding();
importTask.Wait();
4

3 に答える 3

3

あなたのタスク全体は、生産者と消費者のワークフローのように聞こえます。バッチ プロセッサはプロデューサーであり、RavenDB データの「インポート」はプロデューサーの出力のコンシューマーです。

BlockingCollection<T>バッチ プロセッサと db インポータの間の接続としてを使用することを検討してください。db インポーターは、バッチ プロセッサが完了したバッチをブロッキング コレクションにプッシュするとすぐにウェイクアップし、「キャッチアップ」してコレクションを空にするとスリープ状態に戻ります。

バッチ プロセッサ プロデューサーはフル スロットルで実行でき、以前に完了したバッチを処理する db インポーター タスクと同時に常に実行されます。バッチ プロセッサが db インポーターよりも先に進む可能性があることを懸念している場合 (b/c db インポートは、各バッチを処理するよりも大幅に時間がかかります)、ブロッキング コレクションに上限を設定して、プロデューサーが追加時にブロックするようにすることができます。その限界を超えて、消費者に追いつく機会を与えます。

しかし、あなたのコメントのいくつかは心配です。Task インスタンスをスピンアップして、バッチ処理に対して非同期で db import を実行することに特に問題はありません。タスク != スレッド。新しいタスク インスタンスを作成する場合、新しいスレッドを作成する場合と同じような莫大なオーバーヘッドはありません。

スレッドを正確に制御しようとすることに夢中にならないでください。コアとまったく同じ数のバケットが必要であると指定したとしても、それらのコアを排他的に使用することはできません。他のプロセスからの何百もの他のスレッドが、タイム スライスの間にスケジュールされます。タスクを使用して作業の論理単位を指定し、TPL にスレッド プールを管理させます。誤ったコントロール感覚のフラストレーションを自分で救ってください。;>

コメントでは、タスクが互いに非同期で実行されているようには見えず (これをどのように判断していますか?)、各バッチが終了した後にメモリが解放されないように見えることを示しています。最初にこれら 2 つの問題の原因がわかるまで、すべてをドロップすることをお勧めします。どこかで Dispose() を呼び出すのを忘れていませんか? オブジェクトのツリー全体を不必要に存続させている参照を保持していませんか? 正しいものを測定していますか?並列タスクはブロックしているデータベースまたはネットワーク I/O によってシリアル化されていますか? これら 2 つの問題が解決されるまでは、並列処理計画が何であるかは重要ではありません。

于 2012-07-26T23:49:29.250 に答える
1

バッチごとに、タスクを開始しています。これは、ループが非常に迅速に完了することを意味します。それはあなたが望んでいたものではない(バッチの数)タスクを残します。あなたが欲しかった(CPUの数)。

解決策: バッチごとに新しいタスクを開始しないでください。for ループは既に並列化されています。

あなたのコメントに応えて、ここに改善されたバージョンがあります:

//this runs in parallel
var processedBatches = datasupplier.GetDataItems()
    .Partition(batchSize)
    .AsParallel()
    .WithDegreeOfParallelism(Environment.ProcessorCount)
    .Select(x => ProcessCpuBound(x));

foreach (var batch in processedBatches) {
 PerformIOIntensiveWorkSingleThreadedly(batch); //this runs sequentially
}
于 2012-07-26T20:09:48.767 に答える