9

フラットファイルを列挙する複数の列挙子があります。私は元々、Parallel Invokeに各列挙子を持っていて、各アクションがに追加され、BlockingCollection<Entity>そのコレクションはConsumingEnumerable()を返していました。

public interface IFlatFileQuery
{
    IEnumerable<Entity> Run();
}

public class FlatFile1 : IFlatFileQuery
{
    public IEnumerable<Entity> Run()
    {
        // loop over a flat file and yield each result
        yield return Entity;
    }
} 

public class Main
{
    public IEnumerable<Entity> DoLongTask(ICollection<IFlatFileQuery> _flatFileQueries)
    {
            // do some other stuff that needs to be returned first:
            yield return Entity;

            // then enumerate and return the flat file data
        foreach (var entity in GetData(_flatFileQueries))
        {
            yield return entity;
        }
    }

    private IEnumerable<Entity> GetData(_flatFileQueries)
    {
        var buffer = new BlockingCollection<Entity>(100);

        var actions = _flatFileQueries.Select(fundFileQuery => (Action)(() =>
        {
            foreach (var entity in fundFileQuery.Run())
            {
                buffer.TryAdd(entity, Timeout.Infinite);
            }
        })).ToArray();

        Task.Factory.StartNew(() =>
        {
            Parallel.Invoke(actions);

            buffer.CompleteAdding();
        });

        return buffer.GetConsumingEnumerable();
    }
}

ただし、少しテストした結果、以下のコード変更は約20〜25%高速であることがわかりました。

private IEnumerable<Entity> GetData(_flatFileQueries)
{
    return _flatFileQueries.AsParallel().SelectMany(ffq => ffq.Run());
}

コード変更の問題は、すべてのフラットファイルクエリが列挙されるまで待機してから、列挙して生成できるロット全体を返すことです。

どういうわけかそれをさらに速くするために上記のコードのビットを生み出すことは可能でしょうか?

すべてのフラットファイルクエリの合計結果は、せいぜい1000程度のエンティティにすぎない可能性があることを付け加えておきます。

編集:以下に変更しても、実行時間に違いはありません。(R#は、以前の状態に戻すことも提案しています)

private IEnumerable<Entity> GetData(_flatFileQueries)
{
        foreach (var entity in _flatFileQueries.AsParallel().SelectMany(ffq => ffq.Run()))
        {
            yield return entity;
        }
}
4

4 に答える 4

3

コード変更の問題は、すべてのフラットファイルクエリが列挙されるまで待機してから、列挙して生成できるロット全体を返すことです。

簡単な例でそれが誤りであることを証明しましょう。TestQueryまず、一定時間後に単一のエンティティを生成するクラスを作成しましょう。次に、いくつかのテストクエリを並行して実行し、結果が得られるまでにかかった時間を測定しましょう。

public class TestQuery : IFlatFileQuery {

    private readonly int _sleepTime;

    public IEnumerable<Entity> Run() {
        Thread.Sleep(_sleepTime);
        return new[] { new Entity() };
    }

    public TestQuery(int sleepTime) {
        _sleepTime = sleepTime;
    }

}

internal static class Program {

    private static void Main() {
        Stopwatch stopwatch = Stopwatch.StartNew();
        var queries = new IFlatFileQuery[] {
            new TestQuery(2000),
            new TestQuery(3000),
            new TestQuery(1000)
        };
        foreach (var entity in queries.AsParallel().SelectMany(ffq => ffq.Run()))
            Console.WriteLine("Yielded after {0:N0} seconds", stopwatch.Elapsed.TotalSeconds);
        Console.ReadKey();
    }

}

このコードは次のように出力します。

1秒
後に降伏2秒
後に降伏3秒後に降伏

この出力を見るとAsParallel()、利用可能になるとすぐに各結果が得られるため、すべてが正常に機能します。並列度によってタイミングが異なる場合があることに注意してください(並列度が1の「2s、5s、6s」など)。事実上、操作全体がまったく並列ではなくなります。この出力は、4コアマシンからのものです。

スレッド間に共通のボトルネック(共有ロックされたリソースなど)がない場合、長い処理はコアの数に応じて拡張される可能性があります。dotTraceなどのツールを使用して改善できる遅い部分があるかどうかを確認するために、アルゴリズムのプロファイルを作成することをお勧めします。

于 2012-11-14T16:52:28.440 に答える
2

私はあなたのコードのどこにも危険信号がないと思います。法外な非効率性はありません。私はそれが複数の小さな違いに帰着すると思います。

PLINQは、データのストリームの処理に非常に優れています。内部的には、同期リストにアイテムを1つずつ追加するよりも効率的に機能します。各呼び出しには内部TryAddで少なくとも2つの操作が必要なため、への呼び出しがボトルネックになっていると思われます。Interlockedすべてのスレッドが同じキャッシュラインをめぐって競合するため、これらはプロセッサ間メモリバスに多大な負荷をかける可能性があります。

PLINQは、内部的にバッファリングを行うため、より安価です。アイテムを1つずつ出力するわけではないと思います。おそらくそれはそれらをバッチ処理し、複数のアイテムにわたってそのように同期化コストを償却します。

2番目の問題は、の制限された容量ですBlockingCollection。100は多くありません。これは多くの待機につながる可能性があります。カーネルへの呼び出しとコンテキストスイッチが必要になるため、待機にはコストがかかります。

于 2012-11-14T17:24:15.283 に答える
2

私は、どのシナリオでもうまく機能するこの代替案を作成します。

これは私のために働きます:

  • Parallel.Foreach Enqueueのタスクで、ConcurrentQueueのアイテムが処理されるように変換されました。
  • タスクには、そのタスクが終了したことを示すフラグをマークする続行があります。
  • タスクを使用した同じ実行スレッドで、しばらくの間デキューが終了し、

私にとって速くて素晴らしい結果:

Task.Factory.StartNew (() =>
{
    Parallel.ForEach<string> (TextHelper.ReadLines(FileName), ProcessHelper.DefaultParallelOptions,
    (string currentLine) =>
    {
        // Read line, validate and enqeue to an instance of FileLineData (custom class)
    });
}).
ContinueWith 
(
    ic => isCompleted = true 
);


while (!isCompleted || qlines.Count > 0)
{
    if (qlines.TryDequeue (out returnLine))
    {
        yield return returnLine;
    }
}
于 2013-05-20T06:00:17.607 に答える
0

デフォルトでは、ParallelQueryクラスはIEnumerable<T>ソースで作業しているときに、「チャンクパーティショニング」と呼ばれるパーティショニング戦略を採用しています。この戦略では、各ワーカースレッドが毎回徐々に多くのアイテムを取得します。これは、入力バッファがあることを意味します。次に、結果は、クエリの利用者が利用できるようになる前に、システムによって選択されたサイズの出力バッファに蓄積されます。EnumerablePartitionerOptions.NoBuffering構成オプションとを使用して、両方のバッファーを無効にすることができますParallelMergeOptions.NotBuffered

private IEnumerable<Entity> GetData(ICollection<IFlatFileQuery> flatFileQueries)
{
    return Partitioner
        .Create(flatFileQueries, EnumerablePartitionerOptions.NoBuffering)
        .AsParallel()
        .AsOrdered()
        .WithMergeOptions(ParallelMergeOptions.NotBuffered)
        .SelectMany(ffq => ffq.Run());
}

このようにして、各ワーカースレッドは一度に1つのアイテムのみを取得し、計算されるとすぐに結果を伝播します。

NoBuffering:一度に列挙可能なソースからアイテムを取得し、複数のスレッドからより効率的にアクセスできる中間ストレージを使用しないパーティショナーを作成します。このオプションは、低レイテンシーのサポートを提供し(アイテムはソースから利用可能になるとすぐに処理されます)、アイテム間の依存関係を部分的にサポートします(スレッドは、スレッド自体が処理を担当するアイテムを待機してデッドロックすることはできません)。

NotBuffered:出力バッファーなしでマージを使用します。結果要素が計算されたらすぐに、その要素をクエリの利用者が利用できるようにします。

于 2020-02-08T01:53:17.603 に答える