8

大量のデータを処理するためのコードを書いていたので、Parallel.ForEachで作成するスレッドごとにファイルを作成して、出力を同期する必要がないようにすると便利だと思いました(少なくとも私は)。

これは次のようになります。

Parallel.ForEach(vals,
    new ParallelOptions { MaxDegreeOfParallelism = 8 },
    ()=>GetWriter(), // returns a new BinaryWriter backed by a file with a guid name
    (item, state, writer)=>
    {
        if(something)
        {
            state.Break();
            return writer;
        }
        List<Result> results = new List<Result>();

        foreach(var subItem in item.SubItems)
            results.Add(ProcessItem(subItem));

        if(results.Count > 0)
        {
            foreach(var result in results)
                result.Write(writer);
        }
        return writer;
    },
    (writer)=>writer.Dispose());

私が予想したことは、最大8つのファイルが作成され、実行時間全体にわたって存続することでした。その後、ForEach呼び出し全体が終了すると、それぞれが破棄されます。実際に起こることは、localInitがアイテムごとに1回呼び出されるように見えるため、何百ものファイルが作成されることになります。ライターは、処理される各アイテムの最後にも廃棄されます。

これは、同じことが起こっていることを示しています。

var vals = Enumerable.Range(0, 10000000).ToArray();
        long sum = 0;
        Parallel.ForEach(vals,
            new ParallelOptions { MaxDegreeOfParallelism = 8 },
            () => { Console.WriteLine("init " + Thread.CurrentThread.ManagedThreadId); return 0L; },
            (i, state, common) =>
            {
                Thread.Sleep(10);
                return common + i;
            },
                (common) => Interlocked.Add(ref sum, common));

そうか:

init 10
init 14
init 11
init 13
init 12
init 14
init 11
init 12
init 13
init 11
... // hundreds of lines over < 30 seconds
init 14
init 11
init 18
init 17
init 10
init 11
init 14
init 11
init 14
init 11
init 18

注:Thread.Sleep呼び出しを省略した場合、「正しく」機能しているように見えることがあります。localInitは、PCで使用することを決定した4つのスレッドに対してそれぞれ1回だけ呼び出されます。ただし、毎回ではありません。

これは関数の望ましい動作ですか?これを行う原因となる舞台裏で何が起こっているのでしょうか?そして最後に、私の希望する機能であるThreadLocalを取得するための良い方法は何ですか?

ちなみに、これは.NET4.5にあります。

4

4 に答える 4

8

Parallel.ForEachあなたが思うように動作しません。Taskメソッドはクラスの上に構築されており、との間の関係は1:1TaskThreadはないことに注意することが重要です。たとえば、2つの管理対象スレッドで実行される10個のタスクを持つことができます。

現在の行の代わりに、メソッド本体でこの行を使用してみてください。

Console.WriteLine("ThreadId {0} -- TaskId {1} ",
                  Thread.CurrentThread.ManagedThreadId, Task.CurrentId);

ThreadIdが一意のIDで示されているように、多くの異なるタスクで再利用されることがわかります。への呼び出しを残したり、増やしたりすると、これがさらに表示されますThread.Sleep

メソッドがどのように機能するかについての(非常に)基本的な考え方はParallel.ForEach、列挙型が列挙型のプロセスセクションを実行する一連のタスクを作成することです。これが行われる方法は、入力に大きく依存します。完了せずに特定のミリ秒数を超えるタスクの場合をチェックするいくつかの特別なロジックもあります。その場合は、作業を軽減するために新しいタスクが生成される可能性があります。

localinitの関数のドキュメントを見ると、各スレッドParallel.ForEachではなく、それが記載されていることがわかります。returns the initial state of the local data for each _task_

なぜ8つ以上のタスクが生成されているのかと疑問に思うかもしれません。その答えは、のドキュメントにある最後の答えと似ていますParallelOptions.MaxDegreeOfParallelism

デフォルトから変更すると、使用される同時タスクの数がMaxDegreeOfParallelism制限されるだけです。

この制限は、同時タスクの数にのみ適用され、処理中の全時間中に作成されるタスクの数にハード制限はありません。また、前述したように、別のタスクが生成される場合があります。その結果、localinit関数が複数回呼び出され、数百のファイルがディスクに書き込まれます。

特に同期I/Oを使用している場合、ディスクへの書き込みは確かに少し待ち時間のある操作です。ディスク操作が発生すると、スレッド全体がブロックされます。同じことが。でも起こりThread.Sleepます。これTaskを行うと、現在実行中のスレッドがブロックされ、他のタスクを実行できなくなります。通常、これらの場合、スケジューラーはTaskスラックを拾うのを助けるために新しいものをスポーンします。

そして最後に、私の希望する機能であるThreadLocalを取得するための良い方法は何ですか?

Parallel.ForEach肝心なのは、スレッドを扱っていないため、スレッドローカルは意味をなさないということです。あなたはタスクを扱っています。多くのタスクが同時に同じスレッドを使用できるため、ローカルスレッドをタスク間で共有できます。また、タスクのスレッドローカルは実行の途中で変更される可能性があります。これは、スケジューラがタスクの実行をプリエンプトしてから、別のスレッドローカルを持つ別のスレッドで実行を続行できるためです。

最善の方法はわかりませんが、localinit関数を使用して任意のリソースを渡し、一度に1つのスレッドでのみリソースを使用できるようにすることができます。を使用して、localfinally使用されていないことをマークし、別のタスクで取得できるようにすることができます。これは、それらのメソッドが設計された目的です。各メソッドは、生成されるタスクごとに1回だけ呼び出されます(Parallel.ForEachMSDNドキュメントの備考セクションを参照してください)。

自分で作業を分割し、独自のスレッドセットを作成して、作業を実行することもできます。しかし、私の意見では、Parallelクラスはすでにあなたのためにこの重労働を行っているので、これはあまり考えられていません。

于 2013-01-20T09:30:34.280 に答える
2

あなたが見ているのは、あなたの仕事をできるだけ早く終わらせようとしている実装です。

これを行うために、スループットを最大化するためにさまざまな数のタスクを使用しようとします。スレッドプールから特定の数のスレッドを取得し、作業を少し実行します。次に、スレッドの追加と削除を試みて、何が起こるかを確認します。すべての作業が完了するまで、これを続けます。

アルゴリズムは、作業が大量のCPUを使用しているか、大量のIOを使用しているか、または同期が多くスレッドが相互にブロックしている場合でも、それがわからないという点で非常に馬鹿げています。スレッドを追加および削除し、各作業単位が完了する速度を測定するだけです。

これは、スレッドを挿入およびリタイアするときに、継続的にyourlocalInitおよびlocalFinallyfunctionsを呼び出していることを意味します。これは、あなたが見つけたものです。

残念ながら、このアルゴリズムを制御する簡単な方法はありません。Parallel.ForEachは、スレッド管理コードの多くを意図的に隠す高レベルの構造です。


を使用すると少し役立つかもしれませんが、新しいスレッドを要求しThreadLocalたときにスレッドプールが同じスレッドを再利用するという事実に依存しています。Parallel.ForEachこれは保証されていません。実際、スレッドプールが呼び出し全体で正確に8つのスレッドを使用する可能性はほとんどありません。これは、必要以上のファイルを作成することを意味します。


保証されていること1つは、一度Parallel.ForEachに複数のMaxDegreeOfParallelismスレッドを使用することは決してないということです。

これを有利に使用するには、特定の時間に実行されているスレッドで再利用できるファイルの固定サイズの「プール」を作成します。MaxDegreeOfParallelism一度に実行できるのはスレッドだけなので、を呼び出す前にその数のファイルを作成できますForEach。次に、1つを取得し、localInitで解放しますlocalFinally

もちろん、このプールは自分で作成する必要があり、同時に呼び出されるため、スレッドセーフである必要があります。ただし、ロックのコストと比較して、スレッドは非常に迅速に注入およびリタイアされないため、単純なロック戦略で十分です。

于 2013-01-20T09:25:25.453 に答える
1

MSDNによると、localInitメソッドはスレッドごとではなく、タスクごとに1回呼び出されます。

localInitデリゲートは、ループの実行に参加するタスクごとに1回呼び出され、それらの各タスクの初期ローカル状態を返します。

于 2013-01-20T09:26:09.687 に答える
-1

localInitは、スレッドが作成されたときに呼び出されます。bodyに時間がかかる場合は、別のスレッドを作成して現在のスレッドを一時停止する必要があります。別のスレッドを作成する場合は、localInitを呼び出します。

また、Parallel.ForEachが呼び出されると、MaxDegreeOfParallelism値と同じ数のスレッドが作成されます。例:

var k = Enumerable.Range(0, 1);
Parallel.ForEach(k,new ParallelOptions(){MaxDegreeOfParallelism = 4}.....

最初に呼び出されたときに4つのスレッドを作成します

于 2013-01-20T08:04:44.603 に答える