7

この例ではhttps://stackoverflow.com/a/9980346/93647とここでは、なぜ私のディスラプターの例はとても遅いのですか? (質問の最後に)アイテムを公開するパブリッシャーが 1 人、消費者が 1 人います。

しかし、私の場合、消費者の仕事ははるかに複雑で、時間がかかります。したがって、データを並行して処理する 4 つのコンシューマーが必要です。

たとえば、プロデューサーが数字を生成する場合: 1,2,3,4,5,6,7,8,9,10,11..

consumer1 が 1,5,9 をキャッチし、consumer2 が 2,6,10 をキャッチし、consumer3 が 3,7,11 をキャッチし、consumer4 が 4,8,12... をキャッチします (正確にはこれらの数値ではありません。アイデアは、データを並行して処理する必要があるということです。どの特定の数値がどの消費者で処理されるかは気にしません)

実際のアプリケーションでは、消費者の作業はかなり高価であるため、これは並行して行う必要があることを覚えておいてください。マルチコアシステムの力を利用するために、消費者は異なるスレッドで実行されることを期待しています。

もちろん、4 つのリングバッファーを作成し、1 つのコンシューマーを 1 つのリングバッファーにアタッチすることもできます。このようにして、元の例を使用できます。しかし、それは正しくないだろうと私は感じています。おそらく、1 つのパブリッシャー (1 つのリングバッファー) と 4 つのコンシューマーを作成するのが正しいでしょう - これが私が必要としているものです。

Google グループの非常に似たような質問へのリンクを追加: https://groups.google.com/forum/#!msg/lmax-disruptor/-CLapWuwWLU/GHEP4UkxrAEJ

したがって、次の 2 つのオプションがあります。

  • 1 つのリングで多数のコンシューマー (各コンシューマーは追加のたびに「ウェイクアップ」し、すべてのコンシューマーは同じ WaitStrategy を持つ必要があります)
  • 多くの「1 つのリング - 1 つのコンシューマ」 (各コンシューマは、処理する必要があるデータでのみウェイクアップします。各コンシューマは独自の WaitStrategy を持つことができます)。
4

2 に答える 2

2

編集:コードがFAQから部分的に取られていることを言及するのを忘れていました。このアプローチがフランクの提案よりも良いか悪いかはわかりません。

このプロジェクトは文書化されていません。
とにかく、(最初のリンクに基づいて)次のスニップを試してください-モノラルでテストされ、問題ないようです:

using System;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;

namespace DisruptorTest
{
    public sealed class ValueEntry
    {
        public long Value { get; set; }
    }

    public class MyHandler : IEventHandler<ValueEntry>
    {
        private static int _consumers = 0;
        private readonly int _ordinal;

        public MyHandler()
        {
            this._ordinal = _consumers++;
        }

        public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
        {
            if ((sequence % _consumers) == _ordinal)
                Console.WriteLine("Event handled: Value = {0}, event {1} processed by {2}", data.Value, sequence, _ordinal);
            else
                Console.WriteLine("Event {0} rejected by {1}", sequence, _ordinal);                     
        }
    }

    class Program
    {
        private static readonly Random _random = new Random();
        private const int SIZE = 16;  // Must be multiple of 2
        private const int WORKERS = 4; 

        static void Main()
        {
            var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), SIZE, TaskScheduler.Default);
            for (int i=0; i < WORKERS; i++)
                disruptor.HandleEventsWith(new MyHandler());
            var ringBuffer = disruptor.Start();

            while (true)
            {
                long sequenceNo = ringBuffer.Next();
                ringBuffer[sequenceNo].Value =  _random.Next();;
                ringBuffer.Publish(sequenceNo);
                Console.WriteLine("Published entry {0}, value {1}", sequenceNo, ringBuffer[sequenceNo].Value);
                Console.ReadKey();
            }
        }
    }
}
于 2012-11-12T06:57:17.773 に答える
0

リングバッファの仕様から、すべての消費者があなたのを処理しようとすることがわかりますValueEvent。あなたの場合、あなたはそれを必要としません。

私はそれを次のように解決しました:

処理済みのフィールドを追加しValueEvent、コンシューマーがそのフィールドでテストするイベントを取得すると、既に処理されている場合は次のフィールドに移動します。

最もきれいな方法ではありませんが、それがバッファの仕組みです。

于 2012-11-12T05:36:21.077 に答える