1

私の基本的な問題は、キューが空の場合はすぐにキューからアイテムを処理する必要があるか、キューにアイテムを追加して、すでに処理されているアイテムがある場合は終了する必要があることです。

私は、ピークを使用して物事を単純化する手法を試していますが、邪魔になる可能性のある落とし穴があるのではないかと考えています。ありがとう!

    void SequenceAction(Action action) {
       bool go = false;

       lock (_RaiseEventQueueLock) {
          _RaiseEventQueue.Enqueue(action);
          go = (_RaiseEventQueue.Count == 1); 
       }

       // 'go' can only be true if queue was empty when queue 
       //  was locked and item was enqueued.
       while (go) {
          #if naive_threadsafe_presumption 
          // Peek is threadsafe because in effect this loop owns
          //  the zeroeth item in the queue for as long as the queue 
          //  remains non-empty.
          (_RaiseEventQueue.Peek())();

          #else
          Action a;
          lock (_RaiseEventQueueLock) {
             a = _RaiseEventQueue.Peek();
          }
          a();
          #endif   

          // Lock the queue to see if any item was enqueued while
          //  the zeroeth item was being processed.
          // Note that the code processing an item can call this 
          //  function reentrantly, adding to its own todo list
          //  while insuring that that each item is processed 
          //  to completion.
          lock (_RaiseEventQueueLock) {
             _RaiseEventQueue.Dequeue();
             go = (_RaiseEventQueue.Count > 0); 
          }
       }
    }
4

2 に答える 2

1

実際、あなたPeekはスレッドセーフではありません。キューにアイテムを追加すると、バッキングストア(最終的には配列)のサイズが変更される可能性があります。キューは、挿入と削除のためのヘッドインデックスとテールインデックスを備えた循環バッファに実装されていると思います。

たとえば、キュ​​ーに16個のアイテムがある場合にどうなるか想像してみてください。挿入は8で、削除は9です。キューがいっぱいです。次に、これが発生します。

  1. スレッドAはPeekを呼び出し、Removeインデックス(9)を取得します。
  2. スレッドAがスワップアウトされます。
  3. スレッドBはエンキューを呼び出し、キューを拡張する必要があることを確認します。
  4. スレッドBは、32項目の新しい配列を割り当て、既存の配列からデータをコピーします。データは、削除からラップアラウンドの順にコピーされます。
  5. スレッドBは、Removeを0に、Insertを16に設定します。
  6. スレッドAは次のタイムスライスを取得し、位置9でアイテムを返します。
  7. イベントを順不同で処理したばかりで、最終的には再び処理することになります。
  8. さらに悪いことに、位置0のアイテムを処理せずに削除します。

次の方法でその問題を解決できる可能性があります。

Action nextAction;
lock (_RaiseEventQueueLock)
{
    nextAction = _RaiseEventQueue.Peek();
}
nextAction();

しかし、私はそれにプロとしてのキャリアを賭けるつもりはありません。BlockingCollectionとプロデューサー/コンシューマーデザインを使用することをお勧めします。

可能な修正

私は、次のことがあなたの意図したことをするはずだと思います。

private readonly object _queueLock = new object();
private readonly object _processLock = new object();

void SequenceAction(Action action)
{
    lock (_queueLock)
    {
        _RaiseEventQueue.Enqueue(action);
    }
    if (Monitor.TryEnter(_processLock))
    {
        while (true)
        {
            Action a;
            lock (_queueLock)
            {
                if (_RaiseEventQueue.Count == 0) return;
                a = _RaiseEventQueue.Dequeue();
            }
            a();
        }
        Monitor.Exit(_processLock);
    }
}
于 2013-03-06T03:56:54.563 に答える
1
    // If action already in progress, add new
    //  action to queue and return.
    // If no action in progress, begin processing
    //  the new action and continue processing
    //  actions added to the queue in the meantime.
    void SequenceAction(Action action) {
       lock (_SequenceActionQueueLock) {
          _SequenceActionQueue.Enqueue(action);
          if (_SequenceActionQueue.Count > 1) {
             return;
          }
       }
       // Would have returned if queue was not empty
       //  when queue was locked and item was enqueued.
       for (;;) {
          action();
          lock (_SequenceActionQueueLock) {
             _SequenceActionQueue.Dequeue();
             if (_SequenceActionQueue.Count == 0) {
                return;
             }
             action = _SequenceActionQueue.Peek();
          }
       }
    }
于 2013-03-07T01:19:10.360 に答える