4

キューにデータを追加しているスレッドが 1 つあります。データが追加されたときに他のスレッドに通知して、キューからデータの処理を開始できるようにする必要があります。

1つのオプションは、カウントがゼロより大きいかどうかを確認するためにスレッドがキューを継続的にポーリングすることですが、これは良い方法ではないと思います。他の提案は大歓迎です

これを達成する方法について何か提案があれば、.net フレームワーク 3.5 を使用しています。

そして、1 つが実行しているスレッド q.Enqueue(data)ともう1 つが実行している 2 つのスレッドがあるq.dequeue()場合、この場合、ロックを管理する必要がありますか..?

4

4 に答える 4

6

ManualResetEventスレッドへの通知に使用できます。

ManualResetEvent e = new ManualResetEvent(false);

q.enqueue();doの後e.Set()、処理スレッドで、 のアイテムを待ちますe.WaitOne()

ループ内で処理を行う場合は、 のe.Reset()直後に行う必要がありますe.WaitOne()

于 2013-09-19T07:06:41.437 に答える
1

バッチ処理したいので、キューは使用しません。これは、(ログ) ファイルを開いたり閉じたり、データベースを開いたり閉じたりする必要がある場合に便利です。これは私がそのようなものを作成する方法の例です:

// J. van Langen
public abstract class QueueHandler<T> : IDisposable
{
    // some events to trigger.
    ManualResetEvent _terminating = new ManualResetEvent(false);
    ManualResetEvent _terminated = new ManualResetEvent(false);
    AutoResetEvent _needProcessing = new AutoResetEvent(false);

    // my 'queue'
    private List<T> _queue = new List<T>();

    public QueueHandler()
    {
        new Thread(new ThreadStart(() =>
        {
            // what handles it should wait on.
            WaitHandle[] handles = new WaitHandle[] { _terminating, _needProcessing };

            // while not terminating, loop (0 timeout)
            while (!_terminating.WaitOne(0))
            {
                // wait on the _terminating and the _needprocessing handle.
                WaitHandle.WaitAny(handles);

                // my temporay array to store the current items.
                T[] itemsCopy;

                // lock the queue
                lock (_queue)
                {
                    // create a 'copy'
                    itemsCopy = _queue.ToArray();

                    // clear the queue.
                    _queue.Clear();
                }

                if (itemsCopy.Length > 0)
                    HandleItems(itemsCopy);
            }

            // the thread is done.
            _terminated.Set();

        })).Start();
    }

    public abstract void HandleItems(T[] items);

    public void Enqueue(T item)
    {
        // lock the queue to add the item.
        lock (_queue)
            _queue.Add(item);

        _needProcessing.Set();
    }

    // batch
    public void Enqueue(IEnumerable<T> items)
    {
        // lock the queue to add multiple items.
        lock (_queue)
            _queue.AddRange(items);

        _needProcessing.Set();
    }

    public void Dispose()
    {
        // let the thread know it should stop.
        _terminating.Set();

        // wait until the thread is stopped.
        _terminated.WaitOne();
    }

}

_terminating/_terminatedには a を使用しますManualResetEvent。それらは設定されたものだけだからです。

_needProcessing私が使用するのはAutoResetEvent、ManualResetEvent では実行できません。これは、トリガーされたときに別のスレッドがSet再び実行できるためです。そのためReset、WaitHandle.WaitAny の後に追加すると、新しく追加された項目を元に戻すことができます。(うーん、誰かがこれをもっと簡単に説明できるなら、大歓迎です。:)

例:

public class QueueItem
{
}

public class MyQueue : QueueHandler<QueueItem>
{
    public override void HandleItems(QueueItem[] items)
    {
        // do your thing.
    }
}


public void Test()
{
    MyQueue queue = new MyQueue();

    QueueItem item = new QueueItem();
    queue.Enqueue(item);

    QueueItem[] batch = new QueueItem[]
    {
        new QueueItem(),
        new QueueItem()
    };

    queue.Enqueue(batch);

    // even on dispose, all queued items will be processed in order to stop the QueueHandler.
    queue.Dispose();
}
于 2013-09-19T07:33:17.993 に答える
-1

BlockingCollection は Queue よりも優れていると思います。それ以外の場合は、キューのサイズを継続的にチェックする (そしてゼロのときにスレッドを一時停止する) ことは、まったく問題のないアプローチです。

ところで、ここでは生産者と消費者のパターンについて話しています。他のアプローチについては、グーグルで検索できると思います。

于 2013-09-19T07:05:44.597 に答える