次の静的拡張メソッドのオーバーロードを参照してください。Parallel.ForEach
public static ParallelLoopResult ForEach<TSource, TLocal>(
IEnumerable<TSource> source,
Func<TLocal> localInit,
Func<TSource, ParallelLoopState, TLocal, TLocal> taskBody,
Action<TLocal> localFinally
)
あなたの特定の例では
この線:
() => 0, // method to initialize the local variable
定数の整数ゼロを返す単純なラムダ (無名関数) です。このラムダはlocalInit
パラメーターとして に渡されますParallel.ForEach
- ラムダは整数を返すため、型がFunc<int>
あり、型はコンパイラーによってTLocal
推測できます (同様に、パラメーターとして渡されたコレクションの型から推測できます) 。int
TSource
source
次に、戻り値 (0) が 3 番目のパラメータ ( という名前subtotal
)として に渡されますtaskBody
Func
。この (0) は、ボディ ループの初期シードとして使用されます。
(j, loop, subtotal) =>
{
subtotal += nums[j]; //modify local variable (Bad idea, see comment)
return subtotal; // value to be passed to next iteration
}
この 2 番目のラムダ ( に渡されるtaskBody
) は、N 回呼び出されます。N は、TPL パーティショナーによってこのタスクに割り当てられたアイテムの数です。
taskBody
2 番目のラムダへの後続の各呼び出しは、このタスクの実行中の部分的なsubTotal
合計を効果的に計算する、の新しい値を渡します。このタスクに割り当てられたすべての項目が追加された後、3 番目で最後の関数パラメーターが再度呼び出され、から返された の最終値が渡されます。このようないくつかのタスクが並行して動作するため、すべての部分合計を最終的な「総計」に合計する最終ステップも必要になります。ただし、複数の同時タスク (異なるスレッド上) が変数をめぐって競合する可能性があるため、変数への変更はスレッドセーフな方法で行うことが重要です。localFinally
subtotal
taskBody
grandTotal
(より明確にするために、MSDN 変数の名前を変更しました)
long grandTotal = 0;
Parallel.ForEach(nums, // source collection
() => 0, // method to initialize the local variable
(j, loop, subtotal) => // method invoked by the loop on each iteration
subtotal + nums[j], // value to be passed to next iteration subtotal
// The final value of subtotal is passed to the localFinally function parameter
(subtotal) => Interlocked.Add(ref grandTotal, subtotal)
MS の例では、タスク本体内のパラメーター subtotal の変更は不適切な方法であり、不要です。つまり、コードは、ラムダの省略形射影に省略できるのでsubtotal += nums[j]; return subtotal;
、より良いでしょう。return subtotal + nums[j];
(j, loop, subtotal) => subtotal + nums[j]
一般に
Parallel.For / Parallel.ForEachのlocalInit / body / localFinally
オーバーロードにより、タスクごとに 1 回の初期化とクリーンアップ コードを、タスクによって反復が実行される前と後に (それぞれ)実行できます。taskBody
( for 範囲に注意してください / 並列に渡された Enumerable For
/Foreach
は のバッチに分割されIEnumerable<>
、それぞれにタスクが割り当てられます)
各 TaskでlocalInit
が1body
回呼び出され、コードが繰り返し呼び出され、アイテムごとに 1 回 (0..N
回)バッチで呼び出され、localFinally
完了時に 1 回呼び出されます。
さらに、からの一般的な戻り値を介して、タスクの期間に必要な状態 (つまり、taskBody
およびlocalFinally
デリゲート) を渡すことができます。以下でこの変数を呼び出しました。TLocal
localInit Func
taskLocals
「localInit」の一般的な用途:
- データベース接続や Web サービス接続など、ループ本体に必要な高価なリソースの作成と初期化。
- 実行中の合計またはコレクションを保持する (競合しない) タスクローカル変数を保持する
localInit
からtaskBody
andに複数のオブジェクトを返す必要がある場合localFinally
は、厳密に型指定されたクラス a Tuple<,,>
or を使用できます。 にラムダのみを使用する場合はlocalInit / taskBody / localFinally
、匿名クラスを介してデータを渡すこともできます。return を使用してlocalInit
複数のタスク間で参照型を共有する場合は、このオブジェクトのスレッド セーフを考慮する必要があることに注意してください。不変性が望ましいです。
「localFinally」アクションの一般的な使用法:
- (データベース接続、ファイル ハンドル、Web サービス クライアントなど) で
IDisposables
使用されているリソースを解放します。taskLocals
- 各タスクによって行われた作業を共有変数に集約/結合/削減します。これらの共有変数は競合するため、スレッドの安全性が懸念されます。
- たとえば
Interlocked.Increment
、整数のようなプリミティブ型
lock
または同様のものが書き込み操作に必要になります
- 同時収集を利用して、時間と労力を節約します。
これtaskBody
はtight
ループ操作の一部です。パフォーマンスのためにこれを最適化する必要があります。
これはすべて、コメント付きの例で最もよく要約されています。
public void MyParallelizedMethod()
{
// Shared variable. Not thread safe
var itemCount = 0;
Parallel.For(myEnumerable,
// localInit - called once per Task.
() =>
{
// Local `task` variables have no contention
// since each Task can never run by multiple threads concurrently
var sqlConnection = new SqlConnection("connstring...");
sqlConnection.Open();
// This is the `task local` state we wish to carry for the duration of the task
return new
{
Conn = sqlConnection,
RunningTotal = 0
}
},
// Task Body. Invoked once per item in the batch assigned to this task
(item, loopState, taskLocals) =>
{
// ... Do some fancy Sql work here on our task's independent connection
using(var command = taskLocals.Conn.CreateCommand())
using(var reader = command.ExecuteReader(...))
{
if (reader.Read())
{
// No contention for `taskLocal`
taskLocals.RunningTotal += Convert.ToInt32(reader["countOfItems"]);
}
}
// The same type of our `taskLocal` param must be returned from the body
return taskLocals;
},
// LocalFinally called once per Task after body completes
// Also takes the taskLocal
(taskLocals) =>
{
// Any cleanup work on our Task Locals (as you would do in a `finally` scope)
if (taskLocals.Conn != null)
taskLocals.Conn.Dispose();
// Do any reduce / aggregate / synchronisation work.
// NB : There is contention here!
Interlocked.Add(ref itemCount, taskLocals.RunningTotal);
}
その他の例:
タスクごとの非競合辞書の例
タスクごとのデータベース接続の例