1

可変数のワーカースレッドを作成し、それらの間でタスクを分散するスレッドがあります。これは、以下に実装を示すTaskQueueオブジェクトをスレッドに渡すことで解決されます。

これらのワーカースレッドは、与えられたTaskQueueオブジェクトを繰り返し処理し、各タスクを実行します。

private class TaskQueue : IEnumerable<Task>
{
    public int Count
    {
        get
        {
            lock(this.tasks)
            {
                return this.tasks.Count;
            }
        }
    }

    private readonly Queue<Task> tasks = new Queue<Task>();
    private readonly AutoResetEvent taskWaitHandle = new AutoResetEvent(false);

    private bool isFinishing = false;
    private bool isFinished = false;

    public void Enqueue(Task task)
    {
        Log.Trace("Entering Enqueue, lock...");
        lock(this.tasks)
        {
            Log.Trace("Adding task, current count = {0}...", Count);
            this.tasks.Enqueue(task);

            if (Count == 1)
            {
                Log.Trace("Count = 1, so setting the wait handle...");
                this.taskWaitHandle.Set();
            }
        }
        Log.Trace("Exiting enqueue...");
    }

    public Task Dequeue()
    {
        Log.Trace("Entering Dequeue...");
        if (Count == 0)
        {
            if (this.isFinishing)
            {
                Log.Trace("Finishing (before waiting) - isCompleted set, returning empty task.");
                this.isFinished = true;
                return new Task();
            }

            Log.Trace("Count = 0, lets wait for a task...");
            this.taskWaitHandle.WaitOne();
            Log.Trace("Wait handle let us through, Count = {0}, IsFinishing = {1}, Returned = {2}", Count, this.isFinishing);

            if(this.isFinishing)
            {
                Log.Trace("Finishing - isCompleted set, returning empty task.");
                this.isFinished = true;
                return new Task();
            }
        }

        Log.Trace("Entering task lock...");
        lock(this.tasks)
        {
            Log.Trace("Entered task lock, about to dequeue next item, Count = {0}", Count);
            return this.tasks.Dequeue();
        }
    }

    public void Finish()
    {
        Log.Trace("Setting TaskQueue state to isFinishing = true and setting wait handle...");
        this.isFinishing = true;

        if (Count == 0)
        {
            this.taskWaitHandle.Set();
        }
    }

    public IEnumerator<Task> GetEnumerator()
    {
        while(true)
        {
            Task t = Dequeue();
            if(this.isFinished)
            {
                yield break;
            }

            yield return t;
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}

ご覧のとおり、AutoResetEventオブジェクトを使用して、ワーカースレッドが途中で終了しないようにしています。つまり、タスクを取得する前です。

一言で言えば:

  • メインスレッドは、タスクをそのTaskQueueにEnqeueueすることによってスレッドにタスクを割り当てます
  • メインスレッドは、TaskQueueのFinish()メソッドを呼び出すことにより、実行するタスクがなくなったことをスレッドに通知します。
  • ワーカースレッドは、TaskQueueのDequeue()メソッドを呼び出して、割り当てられた次のタスクを取得します

問題は、Dequeue()メソッドがInvalidOperationExceptionをスローし、キューが空であることを示していることです。ご覧のとおり、ログを追加しましたが、 Set()メソッドの呼び出しがなかったとしても、 AutoResetEventはDequeue ()をブロックしないことがわかりました。

私が理解しているように、AutoResetEvent.Set()を呼び出すと、待機中のスレッド(以前はAutoResetEvent.WaitOne()を呼び出していた)を続行でき、次にAutoResetEvent.Reset()を自動的に呼び出して、次のウェイターをブロックします。

では、何が間違っている可能性がありますか?私は何か間違ったことをしましたか?どこかにエラーがありますか?私は今この上に3時間座っていますが、何が悪いのか理解できません。私を助けてください!

どうもありがとうございます!

4

3 に答える 3

6

デキューコードが正しくありません。カウントをロックしてチェックし、ズボンの縫い目を飛ばして、タスクに何かがあることを期待します。ロックを解除している間は、仮定を保持することはできません:)。カウントチェックとタスク。デキューはロックされた状態で発生する必要があります。

bool TryDequeue(out Tasks task)
{
  task = null;
  lock (this.tasks) {
    if (0 < tasks.Count) {
      task = tasks.Dequeue();
    }
  }
  if (null == task) {
    Log.Trace ("Queue was empty");
  }
  return null != task;
 }

Enqueue()コードも同様に問題に満ちています。エンキュー/デキューは進行状況を保証しません(キューにアイテムがある場合でも、待機中のデキュースレッドがブロックされます)。の署名Enqueue()が間違っています。全体的に、あなたの投稿は非常に貧弱なコードです。率直に言って、あなたはここで噛むことができる以上に噛もうとしていると思います...ああ、そして決してロックの下でログインしないでください。

ConcurrentQueueを使用することを強くお勧めします。

.Net 4.0にアクセスできない場合は、次の実装を使用して開始できます。

public class ConcurrentQueue<T>:IEnumerable<T>
{
    volatile bool fFinished = false;
    ManualResetEvent eventAdded = new ManualResetEvent(false);
    private Queue<T> queue = new Queue<T>();
    private object syncRoot = new object();

    public void SetFinished()
    {
        lock (syncRoot)
        {
            fFinished = true;
            eventAdded.Set();
        }
    }

    public void Enqueue(T t)
    {
        Debug.Assert (false == fFinished);
        lock (syncRoot)
        {
            queue.Enqueue(t);
            eventAdded.Set();
        }
    }

    private bool Dequeue(out T t)
    {
        do
        {
            lock (syncRoot)
            {
                if (0 < queue.Count)
                {
                    t = queue.Dequeue();
                    return true;
                }
                if (false == fFinished)
                {
                    eventAdded.Reset ();
                }
            }
            if (false == fFinished)
            {
                eventAdded.WaitOne();
            }
            else
            {
                break;
            }
        } while (true);
        t = default(T);
        return false;
    }


    public IEnumerator<T> GetEnumerator()
    {
        T t;
        while (Dequeue(out t))
        {
            yield return t;
        }
    }

    System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}
于 2010-08-25T17:17:44.090 に答える
3

私からのより詳細な回答は保留中ですが、非常に重要なことを指摘したいと思います。

.NET 3.5を使用している場合は、クラスを使用できますConcurrentQueue<T>。バックポートは、 .NET3.5で使用可能なRx拡張機能ライブラリに含まれています。

ブロッキング動作が必要なため、aConcurrentQueue<T>をaでラップする必要がありBlockingCollection<T>ます(Rxの一部としても利用可能)。

于 2010-08-25T18:21:04.393 に答える
1

ブロッキングキューを複製しようとしているようです。1つは.NET4.0BCLにBlockingCollectionとしてすでに存在します。.NET 4.0がオプションでない場合は、このコードを使用できます。Monitor.Waitの代わりにandMonitor.Pulseメソッドを使用しAutoResetEventます。

public class BlockingCollection<T>
{
    private Queue<T> m_Queue = new Queue<T>();

    public T Take() // Dequeue
    {
        lock (m_Queue)
        {
            while (m_Queue.Count <= 0)
            {
                Monitor.Wait(m_Queue);
            }
            return m_Queue.Dequeue();
        }
    }

    public void Add(T data) // Enqueue
    {
        lock (m_Queue)
        {
            m_Queue.Enqueue(data);
            Monitor.Pulse(m_Queue);
        }
    }
}

アップデート:

複数のプロデューサーと複数のコンシューマーに対してスレッドセーフにしたい場合は、プロデューサーとコンシューマーのキューを実装することはできないと確信していますAutoResetEvent誰かが反例を思い付くことができれば、間違っていることが証明される準備ができています) 。確かに、あなたはインターネット上で例を見るでしょう、しかしそれらはすべて間違っています。実際、Microsoftによるそのような試みの1つには、キューがライブロックされる可能性があるという欠陥があります。

于 2010-08-25T17:32:35.693 に答える