2

プログラムで使用されているParallel.ForEach()ステートメントがあります。いくつかのオブジェクトのリストを入力として使用します。出力順序は気にしませんが、入力リストと同じ順序で入力要素を取得するには、このループが必要です。これを達成することは可能Parallel.ForEach()ですか?

4

3 に答える 3

3

からの順序を保持する必要がある場合は、さまざまなカスタム パーティショナーIEnumerable<T>を実装することをお勧めします。このクラスのコード例には、列挙から昇順で一度に 1 つずつ取得する簡単な例が含まれています。OrderablePartitioner<T>

ただし、これは、次のような単純な生産者と消費者のモデルである可能性があるものにとっては多くの作業ですConcurrentQueue<T>

var queue = new ConcurrentQueue<X>(yourEnumerableOfX);
Action consumer = () =>
{
    X x;
    while (queue.TryDequeue(out x))
    {
        x.Frob();
    }
};

// At most N "in flight"
int maxParallelism = Environment.ProcessorCount;
var consumers = Enumerable.Repeat(consumer, maxParallelism).ToArray();
Parallel.Invoke(consumers);

このコードを使用すると、First-In-First-Out の動作が保証され、リクエストがほぼ受信順に「進行中」に処理されることが保証されます。並列に配置すると、それらが連続した順序のままであるという保証はありません。

または、以下を使用することもできます (キュー アイテムの数が固定されているという制限があります)。

// Executes exactly queue.Count iterations at the time of Parallel.ForEach
// due to "snapshot" isolation of ConcurrentQueue<X>.GetEnumerator()
var queue = new ConcurrentQueue<X>(yourEnumerableOfX);
Parallel.ForEach(
    queue,
    _ =>
    {
        X x;
        if (queue.TryDequeue(out x))
        {
            x.Frob();
        }
    });

1 つのスレッドで生成を続け、他のスレッドで消費する場合はBlockingCollection<T>、バッキング コレクションとしてキューを使用します。

var queue = new BlockingCollection<X>(new ConcurrentQueue<X>());

// add to it
Task.Factory.StartNew( () =>
    {
         foreach (var x in yourEnumerableOfX)
         {
             queue.Add(x);
             Thread.Sleep(200);
         }

         // Signal to our consumers we're done:
         queue.CompleteAdding();
    });

キューアイテムがいくつ存在するか正確にわからないため、「無制限の」コンシューマーが必要です。

// Roughly the same consumer code as above, but 'unbounded'
Action consumer = () =>
{
    while (!queue.IsCompleted)
    {
        X x;
        try
        {
            // blocking form, switch to TryTake and maybe Thread.Sleep()
            x = queue.Take();
        }
        catch (InvalidOperationException)
        {
            // none left
            break;
        }

        x.Frob();
    }
};

int maxParallelism = Environment.ProcessorCount;
var consumers = Enumerable.Repeat(consumer, maxParallelism).ToArray();
Parallel.Invoke(consumers);
于 2013-09-10T14:41:28.510 に答える
2

OS スケジューラーはアイテム 10 の実行を一時停止し、アイテム 11 を CPU コアに配置して、11 が 10 の前に実行されるようにすることができるため、この機能は存在できません。ライブラリはそれを打ち消すことはできません。どの作業項目も、いつでも無期限に中断できます。

ただし、おおよその順序を取得できます。それについては、他の回答を参照してください。

于 2013-09-10T14:57:42.723 に答える