4

ここに画像の説明を入力してください私は次のように生産者/消費者キューを持っていますが、私は得ていArgumentWExceptionます。

コードは次のとおりです。

public class ProducerConsumer<T> where T : class
    {
        #region Private Variables
        private Thread _workerThread;
        private readonly Queue<T> _workQueue;
        private  object _enqueueItemLocker = new object();
        private  object _processRecordLocker = new object();
        private readonly Action<T> _workCallbackAction;
        private AutoResetEvent _workerWaitSignal;
        #endregion

        #region Constructor
        public ProducerConsumer(Action<T> action)
        {
            _workQueue = new Queue<T>();
            _workCallbackAction = action;

        }
        #endregion
        #region Private Methods 
        private void ProcessRecord()
        {
            while (true)
            {               
                T workItemToBeProcessed = default(T);
                bool hasSomeWorkItem = false;
                lock (_processRecordLocker)
                {
                    hasSomeWorkItem = _workQueue.Count > 0;

                    if (hasSomeWorkItem)
                    {
                        workItemToBeProcessed = _workQueue.Dequeue();
                        if (workItemToBeProcessed == null)
                        {
                            return;
                        }
                    }
                }
                if (hasSomeWorkItem)
                {
                    if (_workCallbackAction != null)
                    {
                        _workCallbackAction(workItemToBeProcessed);
                    }
                }
                else
                {
                    _workerWaitSignal.WaitOne();
                }
            }
        }
        #endregion

        #region Public Methods
        /// <summary>
        /// Enqueues work item in the queue.
        /// </summary>
        /// <param name="workItem">The work item.</param>
        public void EnQueueWorkItem(T workItem)
        {
            lock (_enqueueItemLocker)
            {               
                _workQueue.Enqueue(workItem);

                if (_workerWaitSignal == null)
                {
                    _workerWaitSignal = new AutoResetEvent(false);
                }

                _workerWaitSignal.Set();
            }
        }
        /// <summary>
        /// Stops the processer, releases wait handles.
        /// </summary>
        /// <param name="stopSignal">The stop signal.</param>
        public void StopProcesser(AutoResetEvent stopSignal)
        {
            EnQueueWorkItem(null);

            _workerThread.Join();
            _workerWaitSignal.Close();
            _workerWaitSignal = null;

            if (stopSignal != null)
            {
                stopSignal.Set();
            }
        }
        /// <summary>
        /// Starts the processer, starts a new worker thread.
        /// </summary>
        public void StartProcesser()
        {
            if (_workerWaitSignal == null)
            {
                _workerWaitSignal = new AutoResetEvent(false);
            }
            _workerThread = new Thread(ProcessRecord) { IsBackground = true };
            _workerThread.Start();
        }
        #endregion
    }

別のクラスは次のとおりです。

public class Tester
{
    private readonly ProducerConsumer<byte[]> _proConsumer;
    public Tester()
    {
        _proConsumer = new ProducerConsumer<byte[]>(Display);
    }
    public void AddData(byte[] data)
    {
        try
        {
            _proConsumer.EnQueueWorkItem(recordData);
        }
        catch (NullReferenceException nre)
        {

        }
    }
    public void Start()
    {
        _proConsumer.StartProcesser();
    }

    private static object _recordLocker = new object();

    private void Display(byte[] recordByteStream)
    {
        try
        {
            lock (_recordLocker)
            {
                Console.WriteLine("Done with data:" + BitConverter.ToInt32(recordByteStream, 0));

            }

        }
        catch (Exception ex)
        {

        }

    }
}

そして私の主な機能:

class Program
    {
        private static Tester _recorder;
        static void Main(string[] args)
        {
            _recorder = new Tester();
            _recorder.StartRecording();

            for (int i = 0; i < 100000; i++)
            {
                _recorder.AddRecordData(BitConverter.GetBytes(i));              
            }

            Console.Read();
        }
    }

なぜ例外が発生するのか、それを回避するにはどうすればよいのでしょうか。

4

1 に答える 1

8

現在の実装では、クラスはスレッドセーフではありません。Enqueuelock (_enqueueItemLocker))とDequeue( )の呼び出しに2つの異なるオブジェクトを使用しているため、。lock (_processRecordLocker)に競合状態が発生しますQueue<T>

キューを安全に使用するには、両方の呼び出しで同じオブジェクトインスタンスをロックする必要があります。

.NET 4を使用している場合は、スレッドセーフであるため、コード内のロックが不要になるため、ConcurrentQueue<T>または代わりに使用することをお勧めします。BlockingCollection<T>

于 2012-12-11T23:07:31.380 に答える