単純なパイプラインを作成しようとしているときにBlockingCollection<T>
、Task Parallel Library を使用して、定量化および再現可能な問題が発生しました。ConcurrentQueue<T>
GetConsumingEnumerable
簡単に言えば、あるスレッドからデフォルトBlockingCollection<T>
(フードの下では a に依存している) にエントリを追加しても、メソッドを呼び出している別のスレッドからエントリConcurrentQueue<T>
がポップされることは保証されません。BlockingCollection<T>
GetConsumingEnumerable()
これを再現/シミュレートするための非常に単純なWinformsアプリケーションを作成しました。これは、整数を画面に出力するだけです。
Timer1
作業項目をキューに入れる責任があります... と呼ばれる並行辞書を使用し_tracker
て、ブロッキングコレクションに既に追加されているものを認識します。Timer2
BlockingCollection
の& の両方のカウント状態をログに記録しているだけです_tracker
- START ボタンは
Paralell.ForEach
、ブロッキング コレクションを単純に反復しGetConsumingEnumerable()
、2 番目のリスト ボックスへの出力を開始する を開始します。 - STOP ボタンは
Timer1
、ブロッキング コレクションにエントリが追加されるのを防ぎます。
public partial class Form1 : Form
{
private int Counter = 0;
private BlockingCollection<int> _entries;
private ConcurrentDictionary<int, int> _tracker;
private CancellationTokenSource _tokenSource;
private TaskFactory _factory;
public Form1()
{
_entries = new BlockingCollection<int>();
_tracker = new ConcurrentDictionary<int, int>();
_tokenSource = new CancellationTokenSource();
_factory = new TaskFactory();
InitializeComponent();
}
private void timer1_Tick(object sender, EventArgs e)
{ //ADDING TIMER -> LISTBOX 1
for(var i = 0; i < 3; i++,Counter++)
{
if (_tracker.TryAdd(Counter, Counter))
_entries.Add(Counter);
listBox1.Items.Add(string.Format("Adding {0}", Counter));
}
}
private void timer2_Tick_1(object sender, EventArgs e)
{ //LOGGING TIMER -> LIST BOX 3
listBox3.Items.Add(string.Format("Tracker Count : {0} / Entries Count : {1}", _tracker.Count, _entries.Count));
}
private void button1_Click(object sender, EventArgs e)
{ //START BUTTON -> LOGS TO LIST BOX 2
var options = new ParallelOptions {
CancellationToken = _tokenSource.Token,
MaxDegreeOfParallelism = 1
};
_factory.StartNew(() => { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); });
timer1.Enabled = timer2.Enabled = true;
timer1.Start();
timer2.Start();
}
private void DoWork(int entry)
{
Thread.Sleep(1000); //Sleep for 1 second to simulate work being done.
Invoke((MethodInvoker)(() => listBox2.Items.Add(string.Format("Processed {0}", entry))));
int oldEntry;
_tracker.TryRemove(entry, out oldEntry);
}
private void button2_Click(object sender, EventArgs e)
{ //STOP BUTTON
timer1.Stop();
timer1.Enabled = false;
}
イベントのシーケンスは次のとおりです。
- スタートを押します
- Timer1 がティックし、ListBox1 が 3 つのメッセージで即座に更新されます (0、1、2 を追加)
- ListBox2 はその後、1 秒間隔で 3 つのメッセージで更新されます
- 処理中 0
- 処理1
- 加工2
- Timer1 がティックし、ListBox1 が 3 つのメッセージで即座に更新されます (3、4、5 を追加)
- ListBox2 は、1 秒間隔で 2 つのメッセージで更新されます。
- 処理3
- 処理 4
- 処理 5が印刷されていない... 「行方不明」になったように見える
- STOP を押して、タイマー 1 によってそれ以上メッセージが追加されないようにします
- 待って…「処理中5」がまだ表示されない
コンカレント ディクショナリは、1 つのアイテムがまだ処理されておらず、その後削除されたことを追跡していることがわかります。_tracker
もう一度 Start を押すと、timer1 がさらに 3 つのエントリを追加し始め、Parallel ループが復活して 5、6、7、8 を出力します。
なぜこれが起こるのか、私は完全に途方に暮れています。start を再度呼び出すと、明らかに Parallell foreach を呼び出す newtask が呼び出され、不足しているエントリを魔法のように見つける GetConsumingEnumerable() が再実行されます...
BlockingCollection.GetConsumingEnumerable()
コレクションに追加されたすべてのアイテムを反復処理することが保証されていないのはなぜですか。
その後、さらにエントリを追加すると、「スタックが解除」されて処理が続行されるのはなぜですか?