1

AsParallel() が事前に読み取り、内部バッファに入れるアイテムの量を制限するにはどうすればよいですか?

次に例を示します。

int returnedCounter;

IEnumerable<int> Enum()
{
    while (true)
        yield return Interlocked.Increment(ref returnedCounter);
}

[TestMethod]
public void TestMethod1()
{
    foreach (var i in Enum().AsParallel().Select(a => a))
    {
        Thread.Sleep(3000);
        break;
    }
    Console.WriteLine(returnedCounter);
}

アイテムを 1 つ消費し、スリープし、列挙を停止します。私のマシンでは 526400 と出力されます。私の実際のプロジェクトでは、各アイテムに数千キロバイトが割り当てられます。AsParallel() は多くのアイテムを前もって読み取るため、メモリ消費が非常に悪くなり、CPU が浪費されます。

WithMergeOptions(ParallelMergeOptions.NotBuffered) を入れると少し役に立ちます。4544 と出力されますが、それでも私には多すぎます。

Enum() で待機すると、メイン スレッドでループがフリーズします。

4

2 に答える 2

4

Partitionersについての別の質問!

あなたの場合、一度に 1 つのアイテムのみを取るパーティショナーを見つけて作成する必要があります。

カスタムパーティショナーに関する記事はこちら


アップデート:

SingleItemPartitioner実装を見た場所を思い出しました: それはParallelExtensionsExtrasここのプロジェクトにあります: Samples for Parallel Programming with the .NET Framework

私もあなたのテストコードを読んだところです。私はおそらくそれを最初にやるべきだった!

このコード:

Enum().AsParallel().Select(a => a)

手段:Enum()それを可能な限り速く並列に取得して列挙し、新しい を返しIEnumerable<int>ます。

したがって、foreachアイテムをプルするのではなく、linq ステートメントによって作成されEnum()た新しいアイテムからアイテムをプルします。IEnumerable<int>

また、foreachメインスレッドで実行されるため、各アイテムの作業はシングルスレッドです。

並行して実行したいが、必要な場合にのみアイテムを生成する場合は、次を試してください。

Parallel.ForEach( SingleItemPartitioner.Create( Enum() ), ( i, state ) =>
    {
        Thread.Sleep( 3000 );
        state.Break();
    }
于 2011-12-07T15:59:32.927 に答える
0

回避策を見つけました。

まず、元の質問を明確にさせてください。無限シーケンスで動作する一時停止可能なパイプラインが必要です。パイプラインは次のとおりです。

  1. 同期的にシーケンスから読み取る:Enum()
  2. アイテムを並行して処理する:AsParallel().Select(a => a)
  3. 同期処理を進める: foreachbody

ステップ 3 でパイプラインが一時停止する場合があります。によってエミュレートされSleep()ます。問題は、パイプラインが一時停止しているときに、ステップ 2 が先にフェッチする要素が多すぎることです。PLinq には内部キューが必要です。キュー サイズを明示的に構成することはできません。サイズにParallelMergeOptionsもよりますが。ParallelMergeOptions.NotBufferedキューのサイズを下げますが、サイズがまだ大きすぎます。

私の回避策は、処理されている項目の数を把握し、制限に達したら並列処理を停止し、パイプラインが再び開始されたときに並列処理を再開することです。

int sourceCounter;

IEnumerable<int> SourceEnum() // infinite input sequence
{
    while (true)
        yield return Interlocked.Increment(ref sourceCounter);
}

[TestMethod]
public void PlainPLinq_PausedConsumtionTest()
{
    sourceCounter = 0;
    foreach (var i in SourceEnum().AsParallel().WithMergeOptions(ParallelMergeOptions.NotBuffered).Select(a => a))
    {
        Thread.Sleep(3000);
        break;
    }
    Console.WriteLine("fetched from source sequence: {0}", sourceCounter); // prints 4544 on my machine
}

[TestMethod]
public void MyParallelSelect_NormalConsumtionTest()
{
    sourceCounter = 0;
    foreach (var i in MyParallelSelect(SourceEnum(), 64, a => a))
    {
        if (sourceCounter > 1000000)
            break;
    }
    Console.WriteLine("fetched from source sequence: {0}", sourceCounter);
}

[TestMethod]
public void MyParallelSelect_PausedConsumtionTest()
{
    sourceCounter = 0;
    foreach (var i in MyParallelSelect(SourceEnum(), 64, a => a))
    {
        Thread.Sleep(3000);
        break;
    }
    Console.WriteLine("fetched from source sequence: {0}", sourceCounter);
}

class DataHolder<D> // reference type to store class or struct D
{
    public D Data;
}

static IEnumerable<DataHolder<T>> FetchSourceItems<T>(IEnumerator<T> sourceEnumerator, DataHolder<int> itemsBeingProcessed, int queueSize)
{
    for (; ; )
    {
        var holder = new DataHolder<T>();
        if (Interlocked.Increment(ref itemsBeingProcessed.Data) > queueSize)
        {
            // many enought items are already being processed - stop feeding parallel processing
            Interlocked.Decrement(ref itemsBeingProcessed.Data);
            yield break;
        }
        if (sourceEnumerator.MoveNext())
        {
            holder.Data = sourceEnumerator.Current;
            yield return holder;
        }
        else
        {
            yield return null; // return null DataHolder to indicate EOF
            yield break;
        }
    }
}

IEnumerable<OutT> MyParallelSelect<T, OutT>(IEnumerable<T> source, int queueSize, Func<T, OutT> selector)
{
    var itemsBeingProcessed = new DataHolder<int>();
    using (var sourceEnumerator = source.GetEnumerator())
    {
        for (;;) // restart parallel processing
        {
            foreach (var outData in FetchSourceItems(sourceEnumerator, itemsBeingProcessed, queueSize).AsParallel().WithMergeOptions(ParallelMergeOptions.NotBuffered).Select(
                inData => inData != null ? new DataHolder<OutT> { Data = selector(inData.Data) } : null))
            {
                Interlocked.Decrement(ref itemsBeingProcessed.Data);
                if (outData == null)
                    yield break; // EOF reached
                yield return outData.Data;
            }
        }
    }
}
于 2011-12-09T11:31:04.910 に答える