ここで何をアーカイブしようとしているのか、100%わかりません。何も残らなくなるまで、すべてのアイテムをデキューしようとしていますか?または、一度にたくさんのアイテムをデキューしますか?
最初のおそらく予期しない動作は、次のステートメントで始まります。
theQueue.AsParallel()
ConcurrentQueueの場合、「スナップショット」-列挙子を取得します。したがって、並行スタックを反復処理する場合は、スナップショットのみを反復処理し、「ライブ」キューは反復処理しません。
一般に、反復中に変更しているものを反復することはお勧めできません。
したがって、別のソリューションは次のようになります。
// this way it's more clear, that we only deque for theQueue.Count items
// However after this, the queue is probably not empty
// or maybe the queue is also empty earlier
Parallel.For(0, theQueue.Count,
new ParallelOptions() {MaxDegreeOfParallelism = 20},
() => {
theQueue.TryDequeue(); //and stuff
});
これにより、反復中に何かを操作する必要がなくなります。ただし、そのステートメントの後でも、forループ中に追加されたデータをキューに含めることができます。
キューを一時的に空にするには、おそらくもう少し作業が必要です。これは本当に醜い解決策です。キューにまだアイテムがある間に、新しいタスクを作成します。各タスクの開始は、可能な限りキューからデキューします。最後に、すべてのタスクが終了するのを待ちます。並列処理を制限するために、20を超えるタスクを作成することはありません。
// Probably a kitty died because of this ugly code ;)
// However, this code tries to get the queue empty in a very aggressive way
Action consumeFromQueue = () =>
{
while (tt.TryDequeue())
{
; // do your stuff
}
};
var allRunningTasks = new Task[MaxParallism];
for(int i=0;i<MaxParallism && tt.Count>0;i++)
{
allRunningTasks[i] = Task.Factory.StartNew(consumeFromQueue);
}
Task.WaitAll(allRunningTasks);