.Net 4.7.2のBlockingCollectionで同じパフォーマンスの問題に遭遇し、この投稿を見つけました。私の場合はMultipleProducers-MultipleConsumersです。特に小さなデータチャンクは多くのソースから読み取られ、多くのフィルターで処理する必要があります。いくつかの(Env.ProcessorCount)BlockingCollectionsが使用され、BlockingCollection.GetConsumingEnumerable.MoveNext()
実際のフィルタリングよりもCPU時間を消費するというパフォーマンスプロファイラーが表示されました。
コードをありがとう、@EugeneBeresovsky。参考:私の環境では、BlockingCollectionのほぼ2倍の速度でした。だから、これが私のSpinLockedBlockingCollectionです:
public class BlockingCollectionSpin<T>
{
private SpinLock _lock = new SpinLock(false);
private Queue<T> _queue = new Queue<T>();
public void Add(T item)
{
bool gotLock = false;
try
{
_lock.Enter(ref gotLock);
_queue.Enqueue(item);
}
finally
{
if (gotLock) _lock.Exit(false);
}
}
public bool TryPeek(out T result)
{
bool gotLock = false;
try
{
_lock.Enter(ref gotLock);
if (_queue.Count > 0)
{
result = _queue.Peek();
return true;
}
else
{
result = default(T);
return false;
}
}
finally
{
if (gotLock) _lock.Exit(false);
}
}
public T Take()
{
var spin = new SpinWait();
do
{
bool gotLock = false;
try
{
_lock.Enter(ref gotLock);
if (_queue.Count > 0)
return _queue.Dequeue();
}
finally
{
if (gotLock) _lock.Exit(false);
}
spin.SpinOnce();
} while (true);
}
}
readonly
また、パフォーマンスが重要なコードの場合は、フィールド修飾子を避けることをお勧めします。IL内のすべてのフィールドアクセスにチェックを追加します。次のテストコードを使用
private static void TestBlockingCollections()
{
const int workAmount = 10000000;
var workerCount = Environment.ProcessorCount * 2;
var sw = new Stopwatch();
var source = new long[workAmount];
var rnd = new Random();
for (int i = 0; i < workAmount; i++)
source[i] = rnd.Next(1000000);
var swOverhead = 0.0;
for (int i = 0; i < workAmount; i++)
{
sw.Restart();
swOverhead += sw.Elapsed.TotalMilliseconds;
}
swOverhead /= workAmount;
var sum1 = new long[workerCount];
var queue1 = new BlockingCollection<long>(10000);
var workers = Enumerable.Range(0, workerCount - 1).Select(n =>
Task.Factory.StartNew(() =>
{
foreach (var l in queue1.GetConsumingEnumerable())
sum1[n] += l;
})).ToArray();
Thread.Sleep(1000);
sw.Restart();
foreach (var l in source)
queue1.Add(l);
queue1.CompleteAdding();
Task.WaitAll(workers);
var elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
Console.WriteLine("BlockingCollection {0:F4}ms", elapsed / workAmount);
var sum2 = new long[workerCount];
var queue2 = new BlockingCollectionSlim<long?>();
workers = Enumerable.Range(0, workerCount - 1).Select(n =>
Task.Factory.StartNew(() =>
{
long? l;
while ((l = queue2.Take()).HasValue)
sum2[n] += l.Value;
})).ToArray();
Thread.Sleep(1000);
sw.Restart();
foreach (var l in source)
queue2.Add(l);
for (int i = 0; i < workerCount; i++)
queue2.Add(null);
Task.WaitAll(workers);
elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
Console.WriteLine("BlockingCollectionSlim {0:F4}ms", elapsed / workAmount);
var sum3 = new long[workerCount];
var queue3 = new BlockingCollectionSpin<long?>();
workers = Enumerable.Range(0, workerCount - 1).Select(n =>
Task.Factory.StartNew(() =>
{
long? l;
while ((l = queue3.Take()).HasValue)
sum3[n] += l.Value;
})).ToArray();
Thread.Sleep(1000);
sw.Restart();
foreach (var l in source)
queue3.Add(l);
for (int i = 0; i < workerCount; i++)
queue3.Add(null);
Task.WaitAll(workers);
elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
Console.WriteLine("BlockingCollectionSpin {0:F4}ms", elapsed/workAmount);
if (sum1.Sum() != sum2.Sum() || sum2.Sum() != sum3.Sum())
Console.WriteLine("Wrong sum in the end!");
Console.ReadLine();
}
2コアでHTが有効になっているCorei5-3210Mでは、次の出力が得られます。
BlockingCollection 0.0006ms
BlockingCollectionSlim 0.0010ms(Eugene Beresovsky実装)
BlockingCollectionSpin 0.0003ms
したがって、SpinLockedバージョンは.Netより2倍高速ですBlockingCollection
。しかし、私はそれだけを使用することをお勧めします!コードの単純さ(および保守性)よりもパフォーマンスを本当に好む場合。