29

ObjectPool<T>私の会社ではしばらくの間、コンテンツへのアクセスをブロックする自家製の実装を使用してきました。これは非常に簡単です。a Queue<T>objectロックオン、およびAutoResetEventアイテムが追加されたときに「借用」スレッドに通知するためのシグナルです。

クラスの要点は、実際には次の2つの方法です。

public T Borrow() {
    lock (_queueLock) {
        if (_queue.Count > 0)
            return _queue.Dequeue();
    }

    _objectAvailableEvent.WaitOne();

    return Borrow();
}

public void Return(T obj) {
    lock (_queueLock) {
        _queue.Enqueue(obj);
    }

    _objectAvailableEvent.Set();
}

System.Collections.Concurrent4.0ではなく.NET3.5を使用しているため、によって提供されるものの代わりに、これと他のいくつかのコレクションクラスを使用しています。しかし最近、Reactive Extensionsを使用ているため、実際にはConcurrent(System.Threading.dllで)名前空間を使用できることがわかりました。

当然、名前空間BlockingCollection<T>のコアクラスの1つであるためConcurrent、私やチームメートが書いたものよりも優れたパフォーマンスを提供する可能性があると思いました。

そこで、非常に簡単に機能する新しい実装を書いてみました。

public T Borrow() {
    return _blockingCollection.Take();
}

public void Return(T obj) {
    _blockingCollection.Add(obj);
}

驚いたことに、いくつかの簡単なテスト(複数のスレッドから数千回プールに借用/戻る)によると、元の実装はパフォーマンスの点で大幅BlockingCollection<T>に優れています。どちらも正しく機能しているようです。元の実装の方がはるかに高速であるように見えるだけです。

私の質問:

  1. なぜこれでしょうか?おそらく、BlockingCollection<T>柔軟性が高く(ラッピングすることで機能することを理解していますIProducerConsumerCollection<T>)、パフォーマンスのオーバーヘッドが必然的に発生するためですか?
  2. BlockingCollection<T>これは、クラスの完全に誤った使用法ですか?
  3. これが適切な使用法である場合BlockingCollection<T>、私は適切に使用していないだけですか?たとえば、Take/Addアプローチは過度に単純化されており、同じ機能を取得するためのはるかに優れたパフォーマンスの方法がありますか?

その3番目の質問に答える洞察が誰かにない限り、今のところ元の実装に固執しているように見えます。

4

3 に答える 3

30

ここには、いくつかの潜在的な可能性があります。

まず、BlockingCollection<T>Reactive Extensionsにはバックポートがあり、.NET4の最終バージョンとまったく同じではありません。このバックポートのパフォーマンスが.NET4RTMと異なっていても驚かないでしょう(特にこのコレクションのプロファイルは作成していませんが)。TPLの多くは、.NET3.5バックポートよりも.NET4の方がパフォーマンスが優れています。

BlockingCollection<T>そうは言っても、単一のプロデューサースレッドと単一のコンシューマースレッドがある場合、実装のパフォーマンスは向上すると思います。1つのプロデューサーと1つのコンシューマーを使用すると、ロックによる全体的なパフォーマンスへの影響は小さくなります。リセットイベントは、コンシューマー側で待機するための非常に効果的な手段です。

ただし、BlockingCollection<T>多くのプロデューサースレッドがデータを非常にうまく「エンキュー」できるように設計されています。ロックの競合がかなり早く問題になり始めるため、これは実装ではうまく機能しません。

そうは言っても、ここで1つの誤解を指摘したいと思います。

...おそらく私や私のチームメートが書いたものよりも優れたパフォーマンスを提供するでしょう。

これはしばしば真実ではありません。フレームワークコレクションクラスは通常、非常に優れたパフォーマンスを発揮しますが、特定のシナリオで最もパフォーマンスの高いオプションではないことがよくあります。そうは言っても、それらは非常に柔軟で非常に堅牢でありながら、うまく機能する傾向があります。彼らはしばしば非常にうまくスケーリングする傾向があります。「自作」コレクションクラスは、特定のシナリオではフレームワークコレクションよりも優れていることがよくありますが、特別に設計されたシナリオ以外のシナリオで使用すると問題が発生する傾向があります。これはそのような状況の1つだと思います。

于 2010-06-14T18:25:31.090 に答える
14

BlockingCollection.Net 4でConurrentQueue/AutoResetEventコンボ(OPのソリューションに似ていますが、ロックレス)を試してみましたが、後者のコンボは私のユースケースでは非常に高速だったため、BlockingCollectionを廃止しました。残念ながら、これはほぼ1年前であり、ベンチマーク結果を見つけることができませんでした。

別のAutoResetEventを使用しても、それほど複雑になることはありません。実際、それを抽象化して、一度だけ、BlockingCollectionSlim...にまとめることもできます。

BlockingCollectionは内部的にもConcurrentQueueに依存していますが、スリムなセマフォキャンセルトークンを使用して追加のジャグリングを行います。これにより、追加の機能が得られますが、使用しない場合でもコストがかかります。BlockingCollectionはConcurrentQueueと結合されていませんが、IProducerConsumerCollection代わりに他の実装者と一緒に使用できることにも注意してください。


制限のない、かなり必要最低限​​のBlockingCollectionSlimの実装:

class BlockingCollectionSlim<T>
{
    private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
    private readonly AutoResetEvent _autoResetEvent = new AutoResetEvent(false);
    public void Add(T item)
    {
        _queue.Enqueue(item);
        _autoResetEvent.Set();
    }
    public bool TryPeek(out T result)
    {
        return _queue.TryPeek(out result);
    }
    public T Take()
    {
        T item;
        while (!_queue.TryDequeue(out item))
            _autoResetEvent.WaitOne();
        return item;
    }
    public bool TryTake(out T item, TimeSpan patience)
    {
        if (_queue.TryDequeue(out item))
            return true;
        var stopwatch = Stopwatch.StartNew();
        while (stopwatch.Elapsed < patience)
        {
            if (_queue.TryDequeue(out item))
                return true;
            var patienceLeft = (patience - stopwatch.Elapsed);
            if (patienceLeft <= TimeSpan.Zero)
                break;
            else if (patienceLeft < MinWait)
            // otherwise the while loop will degenerate into a busy loop,
            // for the last millisecond before patience runs out
                patienceLeft = MinWait;
            _autoResetEvent.WaitOne(patienceLeft);
        }
        return false;
    }
    private static readonly TimeSpan MinWait = TimeSpan.FromMilliseconds(1);
于 2015-03-26T00:39:33.730 に答える
2

.Net 4.7.2のBlockingCollectionで同じパフォーマンスの問題に遭遇し、この投稿を見つけました。私の場合はMultipleProducers-MultipleConsumersです。特に小さなデータチャンクは多くのソースから読み取られ、多くのフィルターで処理する必要があります。いくつかの(Env.ProcessorCount)BlockingCollectionsが使用され、BlockingCollection.GetConsumingEnumerable.MoveNext()実際のフィルタリングよりもCPU時間を消費するというパフォーマンスプロファイラーが表示されました。

コードをありがとう、@EugeneBeresovsky。参考:私の環境では、BlockingCollectionのほぼ2倍の速度でした。だから、これが私のSpinLockedBlockingCollectionです:

public class BlockingCollectionSpin<T>
{
    private SpinLock _lock = new SpinLock(false);
    private Queue<T> _queue = new Queue<T>();

    public void Add(T item)
    {
        bool gotLock = false;
        try
        {
            _lock.Enter(ref gotLock);
            _queue.Enqueue(item);
        }
        finally
        {
            if (gotLock) _lock.Exit(false);
        }
    }

    public bool TryPeek(out T result)
    {
        bool gotLock = false;
        try
        {
            _lock.Enter(ref gotLock);
            if (_queue.Count > 0)
            {
                result = _queue.Peek();
                return true;
            }
            else
            {
                result = default(T);
                return false;
            }
        }
        finally
        {
            if (gotLock) _lock.Exit(false);
        }
    }

    public T Take()
    {
        var spin = new SpinWait();
        do
        {
            bool gotLock = false;
            try
            {
                _lock.Enter(ref gotLock);
                if (_queue.Count > 0)
                    return _queue.Dequeue();
            }
            finally
            {
                if (gotLock) _lock.Exit(false);
            }
            spin.SpinOnce();
        } while (true);
    }
}

readonlyまた、パフォーマンスが重要なコードの場合は、フィールド修飾子を避けることをお勧めします。IL内のすべてのフィールドアクセスにチェックを追加します。次のテストコードを使用

private static void TestBlockingCollections()
{
    const int workAmount = 10000000;
    var workerCount = Environment.ProcessorCount * 2;
    var sw = new Stopwatch();
    var source = new long[workAmount];
    var rnd = new Random();
    for (int i = 0; i < workAmount; i++)
        source[i] = rnd.Next(1000000);

    var swOverhead = 0.0;
    for (int i = 0; i < workAmount; i++)
    {
        sw.Restart();
        swOverhead += sw.Elapsed.TotalMilliseconds;
    }
    swOverhead /= workAmount;

    var sum1 = new long[workerCount];
    var queue1 = new BlockingCollection<long>(10000);
    var workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        foreach (var l in queue1.GetConsumingEnumerable())
            sum1[n] += l;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue1.Add(l);
    queue1.CompleteAdding();
    Task.WaitAll(workers);
    var elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollection {0:F4}ms", elapsed / workAmount);

    var sum2 = new long[workerCount];
    var queue2 = new BlockingCollectionSlim<long?>();
    workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        long? l;
        while ((l = queue2.Take()).HasValue)
            sum2[n] += l.Value;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue2.Add(l);
    for (int i = 0; i < workerCount; i++)
        queue2.Add(null);
    Task.WaitAll(workers);
    elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollectionSlim {0:F4}ms", elapsed / workAmount);

    var sum3 = new long[workerCount];
    var queue3 = new BlockingCollectionSpin<long?>();
    workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        long? l;
        while ((l = queue3.Take()).HasValue)
            sum3[n] += l.Value;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue3.Add(l);
    for (int i = 0; i < workerCount; i++)
        queue3.Add(null);
    Task.WaitAll(workers);
    elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollectionSpin {0:F4}ms", elapsed/workAmount);

    if (sum1.Sum() != sum2.Sum() || sum2.Sum() != sum3.Sum())
        Console.WriteLine("Wrong sum in the end!");

    Console.ReadLine();
}

2コアでHTが有効になっているCorei5-3210Mでは、次の出力が得られます。

BlockingCollection 0.0006ms
BlockingCollectionSlim 0.0010ms(Eugene Beresovsky実装)
BlockingCollectionSpin 0.0003ms

したがって、SpinLockedバージョンは.Netより2倍高速ですBlockingCollection。しかし、私はそれだけを使用することをお勧めします!コードの単純さ(および保守性)よりもパフォーマンスを本当に好む場合。

于 2019-01-01T13:13:02.593 に答える