0

Disruptorは、BlockingCollectionよりもはるかに高速であると想定されています。

私の前の質問で、なぜ私のディスラプターの例はとても遅いのですか?私は2つのテストを書きました。DisruptorBlockingCollectionが約14マイクロ秒を費やしたのに対し、約1マイクロ秒(またはそれ以下)を費やしました。

そのため、プログラムで使用することにしましたが、実装すると、BlockingCollectionがまだマイクロ秒を費やしているのに、約マイクロ秒を費やしてDisruptorいることがわかりました。Disruptor5014-18

プロダクションコードを「スタンドアロンテスト」に変更しましたが、Disruptorそれでも50マイクロ秒を費やしています。なんで?

簡略化したテストを以下に示します。このテストでは、2つのオプションがあります。最初のオプションはSleep for 1 msです。次にDisruptor、配信に30〜50マイクロ秒を費やします。2番目のオプションは、アクティビティをシミュレートすることです。次にDisruptor、配信に7マイクロ秒を費やします。BlockingCollection14〜18マイクロ秒の結果で同じテスト。では、なぜDisruptorはBlockingCollectionよりも高速ではないのでしょうか。

私の実際のアプリケーションでは、 Disruptor50マイクロ秒を費やして、多すぎるものを配信します。1マイクロ秒よりもはるかに速くメッセージを配信するはずです。

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;

namespace DisruptorTest
{
    public sealed class ValueEntry
    {
        internal int Id { get; set; }
    }

    class DisruptorTest
    {

        public class MyHandler : IEventHandler<ValueEntry>
        {
            private DisruptorTest _parent;

            public MyHandler(DisruptorTest parent)
            {
                this._parent = parent;
            }

            public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
            {
                _parent.sw.Stop();
                long microseconds = _parent.sw.ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));

                // Filter out abnormal delays > 1000
                if (microseconds < 1000)
                {
                    _parent.sum += (int)microseconds;
                    _parent.count++;
                    if (_parent.count % 1000 == 0)
                    {
                        Console.WriteLine("average disruptor delay (microseconds) = {0}", _parent.sum / _parent.count);
                    }
                }
            }
        }

        private RingBuffer<ValueEntry> _ringBuffer;
        private const int RingSize = 64;

        static void Main(string[] args)
        {
            new DisruptorTest().Run();
        }

        public void Run()
        {
            var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), RingSize, TaskScheduler.Default);
            disruptor.HandleEventsWith(new MyHandler(this));

            _ringBuffer = disruptor.Start();

            for (int i = 0; i < 10001; i++)
            {
                Do();

                // We need to simulate activity to allow event to deliver

                // Option1. just Sleep. Result 30-50 microseconds.
                Thread.Sleep(1);

                // Option2. Do something. Result ~7 microseconds.
                //factorial = 1;
                //for (int j = 1; j < 100000; j++)
                //{
                //    factorial *= j;
                //}
            }
        }

        public static int factorial;

        private Stopwatch sw = Stopwatch.StartNew();
        private int sum;
        private int count;

        public void Do()
        {
            long sequenceNo = _ringBuffer.Next();
            _ringBuffer[sequenceNo].Id = 0;
            sw.Restart();
            _ringBuffer.Publish(sequenceNo);
        }

    }
}

古いコード。今は無視する必要があります:

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;

namespace DisruptorTest
{
    public sealed class ValueEntry
    {
        internal int Id { get; set; }
    }

    class DisruptorTest
    {

        public class MyHandler : IEventHandler<ValueEntry>
        {
            private readonly int _ordinal;
            private readonly int _consumers;
            private DisruptorTest _parent;

            public MyHandler(int ordinal, int consumers, DisruptorTest parent)
            {
                _ordinal = ordinal;
                _consumers = consumers;
                this._parent = parent;
            }

            public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
            {
                if ((sequence % _consumers) == _ordinal)
                {
                    var id = data.Id;
                    _parent.sw[id].Stop();
                    long microseconds = _parent.sw[id].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
                    // filter out abnormal delays > 1000
                    if (microseconds < 1000)
                    {
                        _parent.sum[id] += (int)microseconds;
                        _parent.count[id]++;
                        if (_parent.count[id] % 10 == 0)
                        {
                            Console.WriteLine("Id = {0} average disruptor delay (microseconds) = {1}",
                                id, _parent.sum[id] / _parent.count[id]);
                        }
                    }
                }
            }
        }

        private const int NumberOfThreads = 1;
        private RingBuffer<ValueEntry> _ringBuffer;
        private const int RingSize = 64;

        static void Main(string[] args)
        {
            new DisruptorTest().Run();
        }

        public void Run()
        {
            var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), RingSize, TaskScheduler.Default);
            for (int i = 0; i < NumberOfThreads; i++)
                disruptor.HandleEventsWith(new MyHandler(i, NumberOfThreads, this));

            for (int i = 0; i < sw.Length; i++)
            {
                sw[i] = Stopwatch.StartNew();
            }

            _ringBuffer = disruptor.Start();

            //var rnd = new Random();
            for (int i = 0; i < 1000; i++)
            {
                //Do(rnd.Next(MaxId));
                Do(i % MaxId);
                Thread.Sleep(1);
            }
        }

        private const int MaxId = 100;

        private Stopwatch[] sw = new Stopwatch[MaxId];
        private int[] sum = new int[MaxId];
        private int[] count = new int[MaxId];

        public void Do(int id)
        {
            long sequenceNo = _ringBuffer.Next();
            _ringBuffer[sequenceNo].Id = id;
            sw[id].Restart();
            _ringBuffer.Publish(sequenceNo);
        }

    }
}

出力:

......
Id = 91 average disruptor delay (microseconds) = 50
Id = 92 average disruptor delay (microseconds) = 48
Id = 93 average disruptor delay (microseconds) = 35
Id = 94 average disruptor delay (microseconds) = 35
Id = 95 average disruptor delay (microseconds) = 51
Id = 96 average disruptor delay (microseconds) = 55
Id = 97 average disruptor delay (microseconds) = 38
Id = 98 average disruptor delay (microseconds) = 37
Id = 99 average disruptor delay (microseconds) = 45
4

2 に答える 2

4

1 つのアイテムを公開するのにかかる時間を測定しています。

public void Do(int id)
{
    long sequenceNo = _ringBuffer.Next();
    _ringBuffer[sequenceNo].Id = id;
    sw[id].Restart(); // <--- You're doing this EVERY TIME YOU PUBLISH an item!
    _ringBuffer.Publish(sequenceNo);
}

Stopwatch前回の質問で、精度を適切に利用するには、何千ものパブリッシュを測定する必要があると通知されました。

さらに、テストの途中でまだコンソールに書き込んでいます。これを避ける:

if (_parent.count[id] % 10 == 0)
{
    Console.WriteLine("Id = {0} average disruptor delay (microseconds) = {1}",
        id, _parent.sum[id] / _parent.count[id]);
}

コードをクリーンアップする

少なくとも、コードを少しきれいにするようにしてください。少し整理し直したので、それほど面倒ではありません: http://pastie.org/5382971

Disrputor は、最初はそれほど単純ではありません。今度は、コードを処理し、それを修正する方法を説明する必要があります。さらに重要なことは、スパゲッティ コードがある場合、パフォーマンスの最適化やテストを行うことができないということです。すべてをシンプルかつクリーンに保つようにしてください。この段階では、コードは単純でもクリーンでもありません。

プライベート メンバー変数の恐ろしい命名規則から始めましょう。

private const int NumberOfThreads = 1;
private RingBuffer<ValueEntry> _ringBuffer;
private const int RingSize = 64;
private const int MaxId = 100
private Stopwatch[] sw = new Stopwatch[MaxId];
private int[] sum = new int[MaxId];
private int[] count = new int[MaxId];

一貫性を保つ:

private const int _numberOfThreads = 1;
private RingBuffer<ValueEntry> _ringBuffer;
private const int _ringSize = 64;
private const int _maxId = 100
private Stopwatch[] _sw = new Stopwatch[MaxId];
private int[] _sum = new int[MaxId];
private int[] _count = new int[MaxId];

その他のヒント:

  • ネストされたクラスを取り除きます。
  • メインを別のクラス (Program など) に移動します。

良いテストを構築する

Martin と Michael が最初に言うことの 1 つは、パフォーマンス テストも非常に優れている必要があるため、テスト フレームワークの構築にかなりの時間を費やしたということです。

  • 1000 イベントではなく、数百万イベントを試すことをお勧めします。
  • すべてのイベントに対して 1 つのタイマーのみを使用するようにしてください。
  • アイテムの処理を開始したらタイマーを開始し、処理するアイテムがなくなったらタイマーを停止します。
  • アイテムの処理がいつ終了したかを知る効率的な方法は、CountDownEvent.

アップデート

ストップウォッチの精度は確かに十分なはずです。

Int64 frequency = Stopwatch.Frequency;
Console.WriteLine( "  Timer frequency in ticks per second = {0}", frequency );
Int64 nanosecPerTick = (1000L * 1000L * 1000L) / frequency;
Console.WriteLine( "  Timer is accurate within {0} nanoseconds", nanosecPerTick );

私のマシンでは、解像度は 320 ナノ秒以内です。したがって、タイマーの解像度が問題にならないというOPは正しいです。

OPが平均的な1つのアイテムの配送を測定したいことは理解していますが、それを行うには(少なくとも)2つの方法があります。

その違いを調べなければなりません。概念レベルでは、以下のコードとまったく同じことを行っています。

  1. あなたはたくさんの反復を実行しています。
  2. それらのすべてを測定します。
  3. 合計を計算します。
  4. 最後に平均を計算します。

コード内:

Stopwatch sw = new Stopwatch();
long totalMicroseconds = 0;
int numItems = 1000;
for(int i = 0; i < numItems; i++)
{
    sw.Reset();
    sw.Start();
    OneItemDelivery();
    sw.Stop();
    totalMicroseconds += sw.ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
}
long avgOneItemDelivery = totalMicroseconds/numItems;

パフォーマンスを測定する別の方法は次のとおりです。

  1. タイマーを開始します。
  2. すべての反復を実行します。
  3. タイマーを停止します。
  4. 平均時間を計算します。

コード内:

sw.Start();
for(int i = 0; i < numItems; i++)
{
    OneItemDelivery();    
}
sw.Stop();
totalMicroseconds = sw.ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
long avgOneItemDelivery = totalMicroseconds/numItems;

それぞれに独自の問題があります。

  • 最初の方法は精度が低くなる可能性があり、ストップウォッチがその少量の作業を正確に処理できることをシステムで証明する必要があります (単にナノ秒の精度を計算するだけではありません)。
  • 2 番目の方法には、反復が発生するのにかかる計算時間も含まれます。これにより、測定値にわずかな偏りが生じますが、最初の方法で通常見られる精度の問題に対処できます。

ステートメントを使用するとパフォーマンスが低下することは既にお気づきSleepでしょう。簡単な計算を行うことをお勧めします。階乗を計算するのは良い考えのように思えますが、非常に小さな計算にしてください。100000 は必要ありません。100 も問題ありません。

もちろん、テストのために 2 分間待つ必要はありませんが、10 ~ 20 秒は問題になりません。

于 2012-11-15T15:07:39.097 に答える