15

キューから待機中のすべてのアイテムを消費するコードをいくつか考え出しました。アイテムを 1 つずつ処理するのではなく、すべての待機アイテムをセットとして処理する方が理にかなっています。

このようにキューを宣言しました。

private BlockingCollection<Item> items = 
    new BlockingCollection<Item>(new ConcurrentQueue<Item>);

次に、コンシューマー スレッドで、次のようにアイテムをバッチで読み取る予定です。

Item nextItem;
while (this.items.TryTake(out nextItem, -1))
{
    var workToDo = new List<Item>();
    workToDo.Add(nextItem);

    while(this.items.TryTake(out nextItem))
    {
        workToDo.Add(nextItem);
    }

    // process workToDo, then go back to the queue.
}

このアプローチには有用性が欠けておりGetConsumingEnumerable、より良い方法を見逃したのではないか、それとも私のアプローチに欠陥があるのではないかと思わずにはいられません。

BlockingCollection<T>をバッチで消費するより良い方法はありますか?

4

2 に答える 2

2

いくつかの点でそれほど良くはありませんがConcurrentQueue<T>、私自身はAtomicDequeueAllLLQueue<T>メソッドを使用してバッチ化されたデキューを可能にします。現在キューにあるすべてのアイテムは、単一の (アトミックでスレッドセーフな) 操作でそこから取り出され、非シングルスレッドで消費するためのスレッドセーフなコレクション。このメソッドは、読み取り操作をバッチ処理するシナリオ向けに正確に設計されています。

ただし、これはブロッキングではありませんが、ブロッキング コレクションを簡単に作成するために使用できます。

public BlockingBatchedQueue<T>
{
  private readonly AutoResetEvent _are = new AutoResetEvent(false);
  private readonly LLQueue<T> _store;
  public void Add(T item)
  {
    _store.Enqueue(item);
    _are.Set();
  }
  public IEnumerable<T> Take()
  {
    _are.WaitOne();
    return _store.AtomicDequeueAll();
  }
  public bool TryTake(out IEnumerable<T> items, int millisecTimeout)
  {
    if(_are.WaitOne(millisecTimeout))
    {
      items = _store.AtomicDequeueAll();
      return true;
    }
    items = null;
    return false;
  }
}

これは、次のことを行わない出発点です。

  1. 破棄時に保留中の待機中のリーダーを処理します。
  2. 複数のリーダーとの潜在的な競合について心配してください。両方とも、読み取り中に発生した書き込みによってトリガーされます (時折空の列挙可能な結果が問題ないと見なされるだけです)。
  3. 書き込みに上限を設定します。

これらすべてを追加することもできますが、上記で定義された制限内でバグが発生しないことを願って、いくつかの実用的な使用を最小限に抑えたいと思いました.

于 2012-09-04T09:38:18.893 に答える