0

時間がかかる Azure Table Storage などのデータ ソースから読み取り、データを json または csv に変換し、パーティション キーに応じたファイル名でローカル ファイルに書き込む最善の方法を探しています。
検討中のアプローチの 1 つは、一定の時間間隔でタイマー経過イベント トリガーでファイルへの書き込みタスクを実行することです。

4

1 に答える 1

3

適切に並列化されないもの (I/O など) については、「プロデューサー/コンシューマー モデル」を使用するのが最善の方法です。

それが機能する方法は、並列化不可能なタスクを処理する1つのスレッドがあり、そのタスクが行うことはすべてバッファに読み込まれることです。次に、すべてがバッファから読み取ってデータを処理する一連の並列タスクがあり、データの処理が完了すると、データを別のバッファに入れます。その後、並列化できない方法で結果を再度書き出す必要がある場合は、別の単一のタスクで結果を書き出す必要があります。

public Stream ProcessData(string filePath)
{
    using(var sourceCollection = new BlockingCollection<string>())
    using(var destinationCollection = new BlockingCollection<SomeClass>())
    {
        //Create a new background task to start reading in the file
        Task.Factory.StartNew(() => ReadInFile(filePath, sourceCollection), TaskCreationOptions.LongRunning);

        //Create a new background task to process the read in lines as they come in
        Task.Factory.StartNew(() => TransformToClass(sourceCollection, destinationCollection), TaskCreationOptions.LongRunning);

        //Process the newly created objects as they are created on the same thread that we originally called the function with
        return TrasformToStream(destinationCollection);
    }
}

private static void ReadInFile(string filePath, BlockingCollection<string> collection)
{
    foreach(var line in File.ReadLines(filePath))
    {
        collection.Add(line);
    }

    //This lets the consumer know that we will not be adding any more items to the collection.
    collection.CompleteAdding();
}

private static void TransformToClass(BlockingCollection<string> source, BlockingCollection<SomeClass> dest)
{
    //GetConsumingEnumerable() will take items out of the collection and block the thread if there are no items available and CompleteAdding() has not been called yet.
    Parallel.ForEeach(source.GetConsumingEnumerable(), 
                      (line) => dest.Add(SomeClass.ExpensiveTransform(line));

    dest.CompleteAdding();
}

private static Stream TrasformToStream(BlockingCollection<SomeClass> source)
{
    var stream = new MemoryStream();
    foreach(var record in source.GetConsumingEnumerable())
    {
        record.Seralize(stream);
    }
    return stream;
}

無料の本Patterns for Parallel Programmingを読むことを強くお勧めします。これについて詳しく説明しています。Producer-Consumer モデルを詳細に説明するセクション全体があります。

更新:ループ内のParallel Extension ExtrasからではGetConsumingPartitioner()なく、小規模なパフォーマンス ブート用に使用します。渡されることについていくつかの仮定を行い、必要のない余分なロックを取得します。列挙型の代わりにパーティショナーを渡すことで、これらの余分なロックを取得する必要がなくなります。GetConsumingEnumerable()Parallel.ForEachForEachIEnumerable

于 2013-10-26T16:01:55.663 に答える