2

コンシューマーのタイムアウトをサポートするコンカレント プロデューサー/コンシューマー コレクション (複数のプロデューサーとコンシューマー) を実装しようとしています。

現在、実際のコレクションはかなり複雑です (残念ながら System.Collections.Concurrent には何もありません) が、私の問題を示す最小限のサンプルがあります (少し似ていますBlockingCollection<T>)。

public sealed class ProducerConsumerQueueDraft<T>
{
    private readonly Queue<T> queue = new Queue<T>();
    private readonly object locker = new object();

    public void Enqueue(T item)
    {
        lock (locker)
        {
            queue.Enqueue(item);

            /* This "optimization" is broken, as Nicholas Butler points out.
            if(queue.Count == 1) // Optimization
            */
                Monitor.Pulse(locker); // Notify any waiting consumer threads.
        }
    }

    public T Dequeue(T item)
    {
        lock (locker)
        {
            // Surprisingly, this needs to be a *while* and not an *if*
            // which is the core of my problem.
            while (queue.Count == 0)
                Monitor.Wait(locker);

            return queue.Dequeue();
        }
    }

    // This isn't thread-safe, but is how I want TryDequeue to look.
    public bool TryDequeueDesired(out T item, TimeSpan timeout)
    {
        lock (locker)
        {
            if (queue.Count == 0 && !Monitor.Wait(locker, timeout))
            {
                item = default(T);
                return false;
            }

            // This is wrong! The queue may be empty even though we were pulsed!
            item = queue.Dequeue();
            return true;
        }
    }

    // Has nasty timing-gymnastics I want to avoid.
    public bool TryDequeueThatWorks(out T item, TimeSpan timeout)
    {
        lock (locker)
        {
            var watch = Stopwatch.StartNew();
            while (queue.Count == 0)
            {
                var remaining = timeout - watch.Elapsed;

                if (!Monitor.Wait(locker, remaining < TimeSpan.Zero ? TimeSpan.Zero : remaining))
                {
                    item = default(T);
                    return false;
                }
            }
            item = queue.Dequeue();
            return true;
        }
    }
}

アイデアは簡単です: 空のキューを見つけた消費​​者は通知されるのを待ち、生産者Pulse(注:PulseAll非効率的です) は待っているアイテムを通知します。

私の問題は、次のプロパティです Monitor.Pulse

Pulse を呼び出したスレッドがロックを解放すると、準備完了キュー内の次のスレッド(必ずしもパルスされたスレッドとは限りません)がロックを取得します。

これが意味することは、コンシューマ スレッド C1 がプロデューサー スレッドによって起動されてアイテムを消費する可能性があるが、別のコンシューマ スレッド C2 が C1 がロックを再取得する機会を得る前にロックを取得し、アイテムを消費して、C1 に制御が与えられたときの空のキュー。

つまり、キューが実際に空でない場合はパルスごとに消費者コードを防御的にチェックインし、そうでない場合は戻って手ぶらで待機する必要があります。

これに関する私の主な問題は、非効率的であるということです。スレッドは、作業を行うために起動され、すぐに送り返されて再び待機する可能性があります。これに関連TryDequeueして、 をタイムアウト付きで実装するTryDequeueThatWorksことは、洗練されている必要がある ( を参照) 場合に、不必要に難しく、非効率的です ( を参照TryDequeueDesired)。

どうすればMonitor.Pulse自分のやりたいことをひねることができますか? あるいは、そうする別の同期プリミティブはありますか? TryDequeue私が行ったことよりもタイムアウトを実装するためのより効率的かつ/またはエレガントな方法はありますか?

参考までに、これが私の希望するソリューションの問題を示すテストです。

var queue = new ProducerConsumerQueueDraft<int>();

for (int consumer = 0; consumer < 3; consumer++)
    new Thread(() =>
    {
        while (true)
        {
            int item;

            // This call should occasionally throw an exception.
            // Switching to queue.TryDequeueThatWorks should make
            // the problem go away.
            if (queue.TryDequeueDesired(out item, TimeSpan.FromSeconds(1)))
            {
                // Do nothing.
            }
        }

    }).Start();

Thread.Sleep(1000); // Let consumers get up and running

for (int itemIndex = 0; itemIndex < 50000000; itemIndex++)
{
    queue.Enqueue(0);
}
4

3 に答える 3

2

これに関する私の主な問題は、非効率的であるということです

そうではない。これはよくあることだと思いますが、この種のレースはめったに起こりません。せいぜいブルームーンに一度。while ループは、問題が発生したときに問題が発生しないようにするために必要です。そして、そうなるでしょう。それを台無しにしないでください。

実際には逆で、ロックの設計はレースが発生することを可能にするため効率的です。そして対処します。レースは頻繁に発生しないため、ロックの設計をいじるのは非常に危険です。それらはひどくランダムであるため、変更が失敗を引き起こさないことを証明するのに十分なテストができません。計測コードを追加しても機能せず、タイミングが変更されます。

于 2012-05-20T15:54:04.193 に答える
1

私はこれについて役立つかもしれない記事を書きました:

スレッド同期: Wait と Pulse の謎を解く

特に、whileループが必要な理由について説明します。

于 2012-05-20T16:00:42.200 に答える
1

これは、単純なキーベースの合成プロデューサー/コンシューマー キューです。

public class ConflatingConcurrentQueue<TKey, TValue>
{
    private readonly ConcurrentDictionary<TKey, Entry> entries;
    private readonly BlockingCollection<Entry> queue;

    public ConflatingConcurrentQueue()
    {
        this.entries = new ConcurrentDictionary<TKey, Entry>();
        this.queue = new BlockingCollection<Entry>();
    }

    public void Enqueue(TValue value, Func<TValue, TKey> keySelector)
    {
        // Get the entry for the key. Create a new one if necessary.
        Entry entry = entries.GetOrAdd(keySelector(value), k => new Entry());

        // Get exclusive access to the entry.
        lock (entry)
        {
            // Replace any old value with the new one.
            entry.Value = value;

            // Add the entry to the queue if it's not enqueued yet.
            if (!entry.Enqueued)
            {
                entry.Enqueued = true;
                queue.Add(entry);
            }
        }
    }

    public bool TryDequeue(out TValue value, TimeSpan timeout)
    {
        Entry entry;

        // Try to dequeue an entry (with timeout).
        if (!queue.TryTake(out entry, timeout))
        {
            value = default(TValue);
            return false;
        }

        // Get exclusive access to the entry.
        lock (entry)
        {
            // Return the value.
            value = entry.Value;

            // Mark the entry as dequeued.
            entry.Enqueued = false;
            entry.Value = default(TValue);
        }

        return true;
    }

    private class Entry
    {
        public TValue Value { get; set; }
        public bool Enqueued { get; set; }
    }
}

(これには 1 ~ 2 回のコード レビューが必要になる場合がありますが、一般的には問題ないと思います。)

于 2012-05-20T15:59:07.083 に答える