18

単純なパイプラインを作成しようとしているときにBlockingCollection<T>、Task Parallel Library を使用して、定量化および再現可能な問題が発生しました。ConcurrentQueue<T>GetConsumingEnumerable

簡単に言えば、あるスレッドからデフォルトBlockingCollection<T>(フードの下では a に依存している) にエントリを追加しても、メソッドを呼び出している別のスレッドからエントリConcurrentQueue<T>がポップされることは保証されません。BlockingCollection<T>GetConsumingEnumerable()

これを再現/シミュレートするための非常に単純なWinformsアプリケーションを作成しました。これは、整数を画面に出力するだけです。

  • Timer1作業項目をキューに入れる責任があります... と呼ばれる並行辞書を使用し_trackerて、ブロッキングコレクションに既に追加されているものを認識します。
  • Timer2BlockingCollectionの& の両方のカウント状態をログに記録しているだけです_tracker
  • START ボタンはParalell.ForEach、ブロッキング コレクションを単純に反復しGetConsumingEnumerable()、2 番目のリスト ボックスへの出力を開始する を開始します。
  • STOP ボタンはTimer1、ブロッキング コレクションにエントリが追加されるのを防ぎます。
public partial class Form1 : Form
{
    private int Counter = 0;
    private BlockingCollection<int> _entries;
    private ConcurrentDictionary<int, int> _tracker;
    private CancellationTokenSource _tokenSource;
    private TaskFactory _factory;

    public Form1()
    {
        _entries = new BlockingCollection<int>();
        _tracker = new ConcurrentDictionary<int, int>();
        _tokenSource = new CancellationTokenSource();
        _factory = new TaskFactory(); 
        InitializeComponent();
    }

    private void timer1_Tick(object sender, EventArgs e)
    { //ADDING TIMER -> LISTBOX 1
        for(var i = 0; i < 3; i++,Counter++)
        {
            if (_tracker.TryAdd(Counter, Counter))
            _entries.Add(Counter);
            listBox1.Items.Add(string.Format("Adding {0}", Counter));
        }
    }

    private void timer2_Tick_1(object sender, EventArgs e)
    { //LOGGING TIMER -> LIST BOX 3
        listBox3.Items.Add(string.Format("Tracker Count : {0} / Entries Count : {1}", _tracker.Count, _entries.Count));
    }

    private void button1_Click(object sender, EventArgs e)
    { //START BUTTON -> LOGS TO LIST BOX 2

        var options = new ParallelOptions {
                                CancellationToken = _tokenSource.Token,
                                MaxDegreeOfParallelism = 1
                            };

        _factory.StartNew(() => { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); });

        timer1.Enabled = timer2.Enabled = true;
        timer1.Start();
        timer2.Start();
    }

    private void DoWork(int entry)
    {
        Thread.Sleep(1000); //Sleep for 1 second to simulate work being done.
        Invoke((MethodInvoker)(() => listBox2.Items.Add(string.Format("Processed {0}", entry))));
        int oldEntry;
        _tracker.TryRemove(entry, out oldEntry);
    }

    private void button2_Click(object sender, EventArgs e)
    { //STOP BUTTON
        timer1.Stop();
        timer1.Enabled = false;
    }

イベントのシーケンスは次のとおりです。

  • スタートを押します
  • Timer1 がティックし、ListBox1 が 3 つのメッセージで即座に更新されます (0、1、2 を追加)
  • ListBox2 はその後、1 秒間隔で 3 つのメッセージで更新されます
    • 処理中 0
    • 処理1
    • 加工2
  • Timer1 がティックし、ListBox1 が 3 つのメッセージで即座に更新されます (3、4、5 を追加)
  • ListBox2 は、1 秒間隔で 2 つのメッセージで更新されます。
    • 処理3
    • 処理 4
    • 処理 5が印刷されていない... 「行方不明」になったように見える
  • STOP を押して、タイマー 1 によってそれ以上メッセージが追加されないようにします
  • 待って…「処理中5」がまだ表示されない

エントリがありません

コンカレント ディクショナリは、1 つのアイテムがまだ処理されておらず、その後削除されたことを追跡していることがわかります。_tracker

もう一度 Start を押すと、timer1 がさらに 3 つのエントリを追加し始め、Parallel ループが復活して 5、6、7、8 を出力します。

後続のアイテムが後ろに押し込まれた後、エントリが返されました

なぜこれが起こるのか、私は完全に途方に暮れています。start を再度呼び出すと、明らかに Parallell foreach を呼び出す newtask が呼び出され、不足しているエントリを魔法のように見つける GetConsumingEnumerable() が再実行されます...

BlockingCollection.GetConsumingEnumerable()コレクションに追加されたすべてのアイテムを反復処理することが保証されていないのはなぜですか。

その後、さらにエントリを追加すると、「スタックが解除」されて処理が続行されるのはなぜですか?

4

4 に答える 4

20

GetConsumingEnumerable()では使えませんParallel.ForEach()

TPL エクストラGetConsumingPartitionerのを使用する

ブログ投稿では、使用できない理由についても説明しますGetConsumingEnumerable()

Parallel.ForEach と PLINQ の両方で既定で採用されているパーティショニング アルゴリズムは、同期コストを最小限に抑えるためにチャンクを使用します。要素ごとに 1 回ロックを取得するのではなく、ロックを取得し、要素のグループ (チャンク) を取得してから、ロックを解除します。

つまり、Parallel.ForEach は作業項目のグループを受け取るまで待ってから続行します。まさにあなたの実験が示すもの。

于 2012-04-18T12:19:58.087 に答える
2

基本的に同じことを行う単純なコンソール アプリケーション (.Net 4.5 ベータ版で実行されているため、違いが生じる可能性があります) では、あなたの動作を再現できませんでした。Parallel.ForEach()しかし、これが発生する理由は、入力コレクションをチャンクに分割して実行を最適化しようとするためだと思います。また、列挙可能なものでは、コレクションにアイテムを追加するまでチャンクを作成できません。詳細については、MSDNの「PLINQ および TPL のカスタム パーティショナー」を参照してください。

これを修正するには、使用しないでくださいParallel.ForEach()。それでもアイテムを並行して処理したい場合はTask、各反復で a を開始できます。

于 2012-04-18T12:10:50.570 に答える
0

Parallel.foreachを実行する前にBlockingCollectionの.CompleteAdding()メソッドを呼び出すことができる場合は、上記の問題は問題にならないことに注意してください。私はこれらの2つのオブジェクトを何度も一緒に使用して、すばらしい結果を出しました。

さらに、CompleteAdding()を呼び出した後、いつでもBlockingCollectionを再設定して、必要に応じてアイテムを追加できます(_entries = new BlockingCollection();)

上記のクリックイベントコードを次のように変更すると、エントリが見つからないという問題が解決され、開始ボタンと停止ボタンを複数回クリックした場合に期待どおりに機能するようになります。

private void button2_Click(object sender, EventArgs e)
{ //STOP BUTTON
    timer1.Stop();
    timer1.Enabled = false;
>>>>_entries.CompleteAdding();
>>>>_entries = new BlockingCollection<int>();
}
于 2012-11-04T06:00:42.910 に答える