166

複数のスレッドがキューに追加され、複数のスレッドが同じキューから読み取られるシナリオがあります。キューが特定のサイズに達すると、アイテムがキューから削除されるまで、キューをいっぱいにしているすべてのスレッドが追加時にブロックされます。

以下の解決策は私が現在使用しているものであり、私の質問は次のとおりです。これをどのように改善できますか? 使用する必要がある BCL でこの動作を既に有効にしているオブジェクトはありますか?

internal class BlockingCollection<T> : CollectionBase, IEnumerable
{
    //todo: might be worth changing this into a proper QUEUE

    private AutoResetEvent _FullEvent = new AutoResetEvent(false);

    internal T this[int i]
    {
        get { return (T) List[i]; }
    }

    private int _MaxSize;
    internal int MaxSize
    {
        get { return _MaxSize; }
        set
        {
            _MaxSize = value;
            checkSize();
        }
    }

    internal BlockingCollection(int maxSize)
    {
        MaxSize = maxSize;
    }

    internal void Add(T item)
    {
        Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId));

        _FullEvent.WaitOne();

        List.Add(item);

        Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId));

        checkSize();
    }

    internal void Remove(T item)
    {
        lock (List)
        {
            List.Remove(item);
        }

        Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId));
    }

    protected override void OnRemoveComplete(int index, object value)
    {
        checkSize();
        base.OnRemoveComplete(index, value);
    }

    internal new IEnumerator GetEnumerator()
    {
        return List.GetEnumerator();
    }

    private void checkSize()
    {
        if (Count < MaxSize)
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Set();
        }
        else
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Reset();
        }
    }
}
4

10 に答える 10

206

これは非常に安全ではないようです (同期がほとんど行われていません)。次のようなものはどうですか:

class SizeQueue<T>
{
    private readonly Queue<T> queue = new Queue<T>();
    private readonly int maxSize;
    public SizeQueue(int maxSize) { this.maxSize = maxSize; }

    public void Enqueue(T item)
    {
        lock (queue)
        {
            while (queue.Count >= maxSize)
            {
                Monitor.Wait(queue);
            }
            queue.Enqueue(item);
            if (queue.Count == 1)
            {
                // wake up any blocked dequeue
                Monitor.PulseAll(queue);
            }
        }
    }
    public T Dequeue()
    {
        lock (queue)
        {
            while (queue.Count == 0)
            {
                Monitor.Wait(queue);
            }
            T item = queue.Dequeue();
            if (queue.Count == maxSize - 1)
            {
                // wake up any blocked enqueue
                Monitor.PulseAll(queue);
            }
            return item;
        }
    }
}

(編集)

実際には、リーダーがきれいに終了し始めるようにキューを閉じる方法が必要です-おそらくboolフラグのようなものです-設定されている場合、空のキューは(ブロックするのではなく)単に戻ります:

bool closing;
public void Close()
{
    lock(queue)
    {
        closing = true;
        Monitor.PulseAll(queue);
    }
}
public bool TryDequeue(out T value)
{
    lock (queue)
    {
        while (queue.Count == 0)
        {
            if (closing)
            {
                value = default(T);
                return false;
            }
            Monitor.Wait(queue);
        }
        value = queue.Dequeue();
        if (queue.Count == maxSize - 1)
        {
            // wake up any blocked enqueue
            Monitor.PulseAll(queue);
        }
        return true;
    }
}
于 2009-02-09T22:05:20.590 に答える
60

.net 4 BlockingCollection を使用し、Add() を使用してエンキューし、Take() を使用してデキューします。内部的に非ブロッキング ConcurrentQueue を使用します。詳細はこちらFast and Best Producer/Consumer キュー テクニック BlockingCollection とコンカレント キュー

于 2011-12-09T15:22:55.397 に答える
14

「どうすればこれを改善できますか?」

クラス内のすべてのメソッドを調べて、別のスレッドがそのメソッドまたは他のメソッドを同時に呼び出した場合にどうなるかを検討する必要があります。たとえば、Removeメソッドにロックを設定しますが、Addメソッドにはロックを設定しません。あるスレッドが追加すると同時に別のスレッドが削除するとどうなりますか?悪いこと。

また、メソッドが、最初のオブジェクトの内部データへのアクセスを提供する2番目のオブジェクト(たとえば、GetEnumerator)を返すことができることも考慮してください。1つのスレッドがその列挙子を通過し、別のスレッドが同時にリストを変更していると想像してください。良くない。

経験則として、クラス内のメソッドの数を最小限に抑えることで、これを簡単に正しく実行できるようにすることです。

特に、別のコンテナクラスを継承しないでください。これは、そのクラスのすべてのメソッドを公開し、呼び出し元が内部データを破損したり、データへの部分的に完全な変更を確認したりする方法を提供するためです(データがその時点で破損しているように見えます)。すべての詳細を非表示にし、それらへのアクセスを許可する方法について完全に冷酷になります。

既成のソリューションを使用することを強くお勧めします。スレッド化に関する本を入手するか、サードパーティのライブラリを使用してください。そうでなければ、あなたが試みていることを考えると、あなたは長い間あなたのコードをデバッグするでしょう。

また、呼び出し元が特定のアイテムを選択するよりも、Removeがアイテム(たとえば、キュ​​ーであるため最初に追加されたアイテム)を返す方が理にかなっていますか?また、キューが空の場合、おそらくRemoveもブロックする必要があります。

更新:マークの答えは実際にこれらすべての提案を実装しています!:)しかし、彼のバージョンがなぜそのような改善であるかを理解するのに役立つかもしれないので、これをここに残しておきます。

于 2009-02-09T22:51:03.727 に答える
13

System.Collections.Concurrent 名前空間でBlockingCollectionConcurrentQueueを使用できます。

 public class ProducerConsumerQueue<T> : BlockingCollection<T>
{
    /// <summary>
    /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
    /// </summary>
    public ProducerConsumerQueue()  
        : base(new ConcurrentQueue<T>())
    {
    }

  /// <summary>
  /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
  /// </summary>
  /// <param name="maxSize"></param>
    public ProducerConsumerQueue(int maxSize)
        : base(new ConcurrentQueue<T>(), maxSize)
    {
    }



}
于 2013-02-06T10:39:38.103 に答える
6

Reactive Extensions を使用してこれをノックアップし、次の質問を思い出しました。

public class BlockingQueue<T>
{
    private readonly Subject<T> _queue;
    private readonly IEnumerator<T> _enumerator;
    private readonly object _sync = new object();

    public BlockingQueue()
    {
        _queue = new Subject<T>();
        _enumerator = _queue.GetEnumerator();
    }

    public void Enqueue(T item)
    {
        lock (_sync)
        {
            _queue.OnNext(item);
        }
    }

    public T Dequeue()
    {
        _enumerator.MoveNext();
        return _enumerator.Current;
    }
}

必ずしも完全に安全というわけではありませんが、非常に単純です。

于 2010-05-07T14:14:36.027 に答える
5

これは、スレッド セーフな制限付きブロッキング キューを作成するために私がやってきたものです。

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

public class BlockingBuffer<T>
{
    private Object t_lock;
    private Semaphore sema_NotEmpty;
    private Semaphore sema_NotFull;
    private T[] buf;

    private int getFromIndex;
    private int putToIndex;
    private int size;
    private int numItems;

    public BlockingBuffer(int Capacity)
    {
        if (Capacity <= 0)
            throw new ArgumentOutOfRangeException("Capacity must be larger than 0");

        t_lock = new Object();
        buf = new T[Capacity];
        sema_NotEmpty = new Semaphore(0, Capacity);
        sema_NotFull = new Semaphore(Capacity, Capacity);
        getFromIndex = 0;
        putToIndex = 0;
        size = Capacity;
        numItems = 0;
    }

    public void put(T item)
    {
        sema_NotFull.WaitOne();
        lock (t_lock)
        {
            while (numItems == size)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            buf[putToIndex++] = item;

            if (putToIndex == size)
                putToIndex = 0;

            numItems++;

            Monitor.Pulse(t_lock);

        }
        sema_NotEmpty.Release();


    }

    public T take()
    {
        T item;

        sema_NotEmpty.WaitOne();
        lock (t_lock)
        {

            while (numItems == 0)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            item = buf[getFromIndex++];

            if (getFromIndex == size)
                getFromIndex = 0;

            numItems--;

            Monitor.Pulse(t_lock);

        }
        sema_NotFull.Release();

        return item;
    }
}
于 2009-05-28T12:55:36.713 に答える
2

私はTPLを完全には調査していませんが、TPLにはあなたのニーズに合ったものがあるかもしれません。少なくとも、いくつかのインスピレーションを得るためのリフレクター飼料があるかもしれません。

お役に立てば幸いです。

于 2009-02-09T22:11:23.913 に答える
0

まあ、あなたはSystem.Threading.Semaphoreクラスを見るかもしれません。それ以外 - いいえ、これは自分で作成する必要があります。私の知る限り、そのような組み込みのコレクションはありません。

于 2009-02-09T22:05:22.133 に答える
-1

最大のスループットが必要で、複数のリーダーが読み取り、1 つのライターだけが書き込みできるようにする場合、BCL には ReaderWriterLockSlim と呼ばれるものがあり、コードをスリム化するのに役立ちます...

于 2009-02-09T22:04:32.213 に答える