バイトごとに生成および消費する代わりに、チャンクで作業する場合、おそらくはるかに高速化されます。その場合、コードの「ロックフリー」はおそらくまったく問題になりません。実際、従来のロックソリューションの方が望ましい場合があります。実演してみます。
C# では、ロックのない、単一のプロデューサー、単一のコンシューマー、制限付きキューが提供されます。(リスト A)
難解なインターロック操作はなく、明示的なメモリ バリアさえありません。一見したところ、これは最高に高速でロックフリーであるとしましょう。ではない?ここで、Marc Gravell が
提供したロック ソリューションと比較してみましょう。
コア間で共有 L3 キャッシュを持たないデュアル CPU マシンを使用します。最大で 2 倍の高速化が期待されます。2 倍のスピードアップは、ロックフリー ソリューションが理論上の限界で理想的に機能することを意味します。
ロックフリー コードの理想的な環境を作るために、ここからユーティリティ クラスを使用して、プロデューサー スレッドとコンシューマー スレッドの CPU アフィニティも設定します。
テストの結果のコードを (リスト B) に示します。
それはcaを生産しています。あるスレッドで 10MBytes を別のスレッドで消費します。
キューのサイズは 32KBytes に固定されています。満杯の場合、プロデューサーは待機します。
私のマシンでの典型的なテストの実行は次のようになります。
LockFreeByteQueue: 799ms
ByteQueue: 1843ms
ロックフリー キューは高速です。うわー、それは2倍以上の速さです!それは自慢できることです。:)
何が起こっているのか見てみましょう。Marc のロッキング キューはまさにそれを行います。ロックします。バイトごとにこれを行います。
バイトごとにロックして、データをバイトごとにプッシュする必要は本当にあるのでしょうか? ネットワーク上にチャンクで到着することはほぼ間違いありません (約 1k パケットのように)。内部ソースから実際にバイトごとに到着したとしても、プロデューサーは簡単にそれを適切なチャンクにパッケージ化できます。
それをやってみましょう - バイトごとに生成して消費する代わりに、チャンクで作業し、他の 2 つのテストをマイクロベンチマークに追加しましょう (リスト C、ベンチマーク本体に挿入するだけです)。
典型的な実行は次のようになります。
LockFreePageQueue: 33ms
PageQueue: 25ms
現在、どちらも実際には元のロックフリー コードよりも 20 倍高速です。チャンクを追加したMarc のソリューションは、チャンキングを使用したロックフリー コードよりも実際に高速です。
2 倍のスピードアップをもたらすロックのない構造を採用する代わりに、ロックで問題なく機能する別のソリューションを試みた結果、20 倍 (!) のスピードアップが得られました。
多くの問題の鍵は、ロックを回避することではなく、共有を回避してロックを最小限に抑えることです。上記の場合、バイトコピー中は共有を避けることができます。
ほとんどの場合、プライベート構造で作業してから、単一のポインターをキューに入れることができます。これにより、共有スペースと時間をキューへの単一のポインターの単一の挿入に縮小できます。
リスト A、ロックフリー、単一プロデューサー、単一コンシューマー キュー:
public class BoundedSingleProducerSingleConsumerQueue<T>
{
T[] queue;
volatile int tail;
volatile int head;
public BoundedSingleProducerSingleConsumerQueue(int capacity)
{
queue = new T[capacity + 1];
tail = head = 0;
}
public bool TryEnqueue(T item)
{
int newtail = (tail + 1) % queue.Length;
if (newtail == head) return false;
queue[tail] = item;
tail = newtail;
return true;
}
public bool TryDequeue(out T item)
{
item = default(T);
if (head == tail) return false;
item = queue[head];
queue[head] = default(T);
head = (head + 1) % queue.Length;
return true;
}
}
リスト B、マイクロベンチマーク:
class Program
{
static void Main(string[] args)
{
for (int numtrials = 3; numtrials > 0; --numtrials)
{
using (ProcessorAffinity.BeginAffinity(0))
{
int pagesize = 1024 * 10;
int numpages = 1024;
int totalbytes = pagesize * numpages;
BoundedSingleProducerSingleConsumerQueue<byte> lockFreeByteQueue = new BoundedSingleProducerSingleConsumerQueue<byte>(1024 * 32);
Stopwatch sw = new Stopwatch();
sw.Start();
ThreadPool.QueueUserWorkItem(delegate(object state)
{
using (ProcessorAffinity.BeginAffinity(1))
{
for (int i = 0; i < totalbytes; i++)
{
while (!lockFreeByteQueue.TryEnqueue((byte)(i & 0xFF))) ;
}
}
});
for (int i = 0; i < totalbytes; i++)
{
byte tmp;
while (!lockFreeByteQueue.TryDequeue(out tmp)) ;
}
sw.Stop();
Console.WriteLine("LockFreeByteQueue: {0}ms", sw.ElapsedMilliseconds);
SizeQueue<byte> byteQueue = new SizeQueue<byte>(1024 * 32);
sw.Reset();
sw.Start();
ThreadPool.QueueUserWorkItem(delegate(object state)
{
using (ProcessorAffinity.BeginAffinity(1))
{
for (int i = 0; i < totalbytes; i++)
{
byteQueue.Enqueue((byte)(i & 0xFF));
}
}
});
for (int i = 0; i < totalbytes; i++)
{
byte tmp = byteQueue.Dequeue();
}
sw.Stop();
Console.WriteLine("ByteQueue: {0}ms", sw.ElapsedMilliseconds);
Console.ReadKey();
}
}
}
}
リスト C、チャンク テスト:
BoundedSingleProducerSingleConsumerQueue<byte[]> lockfreePageQueue = new BoundedSingleProducerSingleConsumerQueue<byte[]>(32);
sw.Reset();
sw.Start();
ThreadPool.QueueUserWorkItem(delegate(object state)
{
using (ProcessorAffinity.BeginAffinity(1))
{
for (int i = 0; i < numpages; i++)
{
byte[] page = new byte[pagesize];
for (int j = 0; j < pagesize; j++)
{
page[j] = (byte)(i & 0xFF);
}
while (!lockfreePageQueue.TryEnqueue(page)) ;
}
}
});
for (int i = 0; i < numpages; i++)
{
byte[] page;
while (!lockfreePageQueue.TryDequeue(out page)) ;
for (int j = 0; j < pagesize; j++)
{
byte tmp = page[j];
}
}
sw.Stop();
Console.WriteLine("LockFreePageQueue: {0}ms", sw.ElapsedMilliseconds);
SizeQueue<byte[]> pageQueue = new SizeQueue<byte[]>(32);
ThreadPool.QueueUserWorkItem(delegate(object state)
{
using (ProcessorAffinity.BeginAffinity(1))
{
for (int i = 0; i < numpages; i++)
{
byte[] page = new byte[pagesize];
for (int j = 0; j < pagesize; j++)
{
page[j] = (byte)(i & 0xFF);
}
pageQueue.Enqueue(page);
}
}
});
sw.Reset();
sw.Start();
for (int i = 0; i < numpages; i++)
{
byte[] page = pageQueue.Dequeue();
for (int j = 0; j < pagesize; j++)
{
byte tmp = page[j];
}
}
sw.Stop();
Console.WriteLine("PageQueue: {0}ms", sw.ElapsedMilliseconds);