Disruptorは、BlockingCollectionよりもはるかに高速であると想定されています。
私の前の質問で、なぜ私のディスラプターの例はとても遅いのですか?私は2つのテストを書きました。Disruptor
BlockingCollectionが約14マイクロ秒を費やしたのに対し、約1マイクロ秒(またはそれ以下)を費やしました。
そのため、プログラムで使用することにしましたが、実装すると、BlockingCollectionがまだマイクロ秒を費やしているのに、約マイクロ秒を費やしてDisruptor
いることがわかりました。Disruptor
50
14-18
プロダクションコードを「スタンドアロンテスト」に変更しましたが、Disruptor
それでも50マイクロ秒を費やしています。なんで?
簡略化したテストを以下に示します。このテストでは、2つのオプションがあります。最初のオプションはSleep for 1 ms
です。次にDisruptor
、配信に30〜50マイクロ秒を費やします。2番目のオプションは、アクティビティをシミュレートすることです。次にDisruptor
、配信に7マイクロ秒を費やします。BlockingCollection
14〜18マイクロ秒の結果で同じテスト。では、なぜDisruptorはBlockingCollectionよりも高速ではないのでしょうか。
私の実際のアプリケーションでは、 Disruptor
50マイクロ秒を費やして、多すぎるものを配信します。1マイクロ秒よりもはるかに速くメッセージを配信するはずです。
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;
namespace DisruptorTest
{
public sealed class ValueEntry
{
internal int Id { get; set; }
}
class DisruptorTest
{
public class MyHandler : IEventHandler<ValueEntry>
{
private DisruptorTest _parent;
public MyHandler(DisruptorTest parent)
{
this._parent = parent;
}
public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
{
_parent.sw.Stop();
long microseconds = _parent.sw.ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
// Filter out abnormal delays > 1000
if (microseconds < 1000)
{
_parent.sum += (int)microseconds;
_parent.count++;
if (_parent.count % 1000 == 0)
{
Console.WriteLine("average disruptor delay (microseconds) = {0}", _parent.sum / _parent.count);
}
}
}
}
private RingBuffer<ValueEntry> _ringBuffer;
private const int RingSize = 64;
static void Main(string[] args)
{
new DisruptorTest().Run();
}
public void Run()
{
var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), RingSize, TaskScheduler.Default);
disruptor.HandleEventsWith(new MyHandler(this));
_ringBuffer = disruptor.Start();
for (int i = 0; i < 10001; i++)
{
Do();
// We need to simulate activity to allow event to deliver
// Option1. just Sleep. Result 30-50 microseconds.
Thread.Sleep(1);
// Option2. Do something. Result ~7 microseconds.
//factorial = 1;
//for (int j = 1; j < 100000; j++)
//{
// factorial *= j;
//}
}
}
public static int factorial;
private Stopwatch sw = Stopwatch.StartNew();
private int sum;
private int count;
public void Do()
{
long sequenceNo = _ringBuffer.Next();
_ringBuffer[sequenceNo].Id = 0;
sw.Restart();
_ringBuffer.Publish(sequenceNo);
}
}
}
古いコード。今は無視する必要があります:
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;
namespace DisruptorTest
{
public sealed class ValueEntry
{
internal int Id { get; set; }
}
class DisruptorTest
{
public class MyHandler : IEventHandler<ValueEntry>
{
private readonly int _ordinal;
private readonly int _consumers;
private DisruptorTest _parent;
public MyHandler(int ordinal, int consumers, DisruptorTest parent)
{
_ordinal = ordinal;
_consumers = consumers;
this._parent = parent;
}
public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
{
if ((sequence % _consumers) == _ordinal)
{
var id = data.Id;
_parent.sw[id].Stop();
long microseconds = _parent.sw[id].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
// filter out abnormal delays > 1000
if (microseconds < 1000)
{
_parent.sum[id] += (int)microseconds;
_parent.count[id]++;
if (_parent.count[id] % 10 == 0)
{
Console.WriteLine("Id = {0} average disruptor delay (microseconds) = {1}",
id, _parent.sum[id] / _parent.count[id]);
}
}
}
}
}
private const int NumberOfThreads = 1;
private RingBuffer<ValueEntry> _ringBuffer;
private const int RingSize = 64;
static void Main(string[] args)
{
new DisruptorTest().Run();
}
public void Run()
{
var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), RingSize, TaskScheduler.Default);
for (int i = 0; i < NumberOfThreads; i++)
disruptor.HandleEventsWith(new MyHandler(i, NumberOfThreads, this));
for (int i = 0; i < sw.Length; i++)
{
sw[i] = Stopwatch.StartNew();
}
_ringBuffer = disruptor.Start();
//var rnd = new Random();
for (int i = 0; i < 1000; i++)
{
//Do(rnd.Next(MaxId));
Do(i % MaxId);
Thread.Sleep(1);
}
}
private const int MaxId = 100;
private Stopwatch[] sw = new Stopwatch[MaxId];
private int[] sum = new int[MaxId];
private int[] count = new int[MaxId];
public void Do(int id)
{
long sequenceNo = _ringBuffer.Next();
_ringBuffer[sequenceNo].Id = id;
sw[id].Restart();
_ringBuffer.Publish(sequenceNo);
}
}
}
出力:
......
Id = 91 average disruptor delay (microseconds) = 50
Id = 92 average disruptor delay (microseconds) = 48
Id = 93 average disruptor delay (microseconds) = 35
Id = 94 average disruptor delay (microseconds) = 35
Id = 95 average disruptor delay (microseconds) = 51
Id = 96 average disruptor delay (microseconds) = 55
Id = 97 average disruptor delay (microseconds) = 38
Id = 98 average disruptor delay (microseconds) = 37
Id = 99 average disruptor delay (microseconds) = 45