回避策を見つけました。
まず、元の質問を明確にさせてください。無限シーケンスで動作する一時停止可能なパイプラインが必要です。パイプラインは次のとおりです。
- 同期的にシーケンスから読み取る:
Enum()
- アイテムを並行して処理する:
AsParallel().Select(a => a)
- 同期処理を進める:
foreach
body
ステップ 3 でパイプラインが一時停止する場合があります。によってエミュレートされSleep()
ます。問題は、パイプラインが一時停止しているときに、ステップ 2 が先にフェッチする要素が多すぎることです。PLinq には内部キューが必要です。キュー サイズを明示的に構成することはできません。サイズにParallelMergeOptions
もよりますが。ParallelMergeOptions.NotBuffered
キューのサイズを下げますが、サイズがまだ大きすぎます。
私の回避策は、処理されている項目の数を把握し、制限に達したら並列処理を停止し、パイプラインが再び開始されたときに並列処理を再開することです。
int sourceCounter;
IEnumerable<int> SourceEnum() // infinite input sequence
{
while (true)
yield return Interlocked.Increment(ref sourceCounter);
}
[TestMethod]
public void PlainPLinq_PausedConsumtionTest()
{
sourceCounter = 0;
foreach (var i in SourceEnum().AsParallel().WithMergeOptions(ParallelMergeOptions.NotBuffered).Select(a => a))
{
Thread.Sleep(3000);
break;
}
Console.WriteLine("fetched from source sequence: {0}", sourceCounter); // prints 4544 on my machine
}
[TestMethod]
public void MyParallelSelect_NormalConsumtionTest()
{
sourceCounter = 0;
foreach (var i in MyParallelSelect(SourceEnum(), 64, a => a))
{
if (sourceCounter > 1000000)
break;
}
Console.WriteLine("fetched from source sequence: {0}", sourceCounter);
}
[TestMethod]
public void MyParallelSelect_PausedConsumtionTest()
{
sourceCounter = 0;
foreach (var i in MyParallelSelect(SourceEnum(), 64, a => a))
{
Thread.Sleep(3000);
break;
}
Console.WriteLine("fetched from source sequence: {0}", sourceCounter);
}
class DataHolder<D> // reference type to store class or struct D
{
public D Data;
}
static IEnumerable<DataHolder<T>> FetchSourceItems<T>(IEnumerator<T> sourceEnumerator, DataHolder<int> itemsBeingProcessed, int queueSize)
{
for (; ; )
{
var holder = new DataHolder<T>();
if (Interlocked.Increment(ref itemsBeingProcessed.Data) > queueSize)
{
// many enought items are already being processed - stop feeding parallel processing
Interlocked.Decrement(ref itemsBeingProcessed.Data);
yield break;
}
if (sourceEnumerator.MoveNext())
{
holder.Data = sourceEnumerator.Current;
yield return holder;
}
else
{
yield return null; // return null DataHolder to indicate EOF
yield break;
}
}
}
IEnumerable<OutT> MyParallelSelect<T, OutT>(IEnumerable<T> source, int queueSize, Func<T, OutT> selector)
{
var itemsBeingProcessed = new DataHolder<int>();
using (var sourceEnumerator = source.GetEnumerator())
{
for (;;) // restart parallel processing
{
foreach (var outData in FetchSourceItems(sourceEnumerator, itemsBeingProcessed, queueSize).AsParallel().WithMergeOptions(ParallelMergeOptions.NotBuffered).Select(
inData => inData != null ? new DataHolder<OutT> { Data = selector(inData.Data) } : null))
{
Interlocked.Decrement(ref itemsBeingProcessed.Data);
if (outData == null)
yield break; // EOF reached
yield return outData.Data;
}
}
}
}