7

私は最近 LMAX Disruptor について学び、いくつかの実験を行っています。私を困惑させていることの1つは、のハンドラーメソッドのendOfBatchパラメーターです。次のコードを検討してください。まず、私が呼び出すダミーのメッセージとコンシューマー クラスと:onEventEventHandlerTest1Test1Worker

public class Test1 {

}

public class Test1Worker implements EventHandler<Test1>{
    public void onEvent(Test1 event, long sequence, boolean endOfBatch) {
        try{
            Thread.sleep(500);
        }
        catch(Exception e){
            e.printStackTrace();
        }
        System.out.println("Received message with sequence " + sequence + ". "
                + "EndOfBatch = " + endOfBatch);
    }
}

実際の作業の代わりに 500 ミリ秒の遅延を設定したことに注意してください。また、コンソールにシーケンス番号を出力しています

そして、(プロデューサーとして機能している)ドライバークラスが呼び出されましたDisruptorTest

public class DisruptorTest {

    private static Disruptor<Test1> bus1;

    private static ExecutorService test1Workers;

    public static void main(String[] args){             
        test1Workers = Executors.newFixedThreadPool(1);

        bus1 = new Disruptor<Test1>(new Test1Factory(), 8, test1Workers);           
        bus1.handleEventsWith(new Test1Worker());
        RingBuffer<Test1> buf1 = bus1.start();

        for (int i = 0; i < 10; i++){
            long a = System.currentTimeMillis();
            long next = buf1.next();
            long b = System.currentTimeMillis();
            System.out.println("Delay for claiming slot " + i + " is "+ (b - a));
            try {
                Test1 message = buf1.get(next);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                buf1.publish(next);
            }
        }
    }

    public static class Test1Factory implements EventFactory<Test1> {
        public Test1 newInstance() {
            return new Test1();
        }

    }   
}

ここでは、必要なものを初期化した後、10 個のメッセージを (バッファ サイズ 8) にフィードしRingBuffer、いくつかのことを監視しようとしています - の次のスロットを要求するためのプロデューサーの遅延とRingBuffer、コンシューマー側、および特定のシーケンスがバッチの最後と見なされているかどうか。

さて、興味深いことに、各メッセージの処理に 500 ミリ秒の遅延が関係しているため、これが出力として得られるものです。

Delay for claiming slot 0 is 0
Delay for claiming slot 1 is 0
Delay for claiming slot 2 is 0
Delay for claiming slot 3 is 0
Delay for claiming slot 4 is 0
Delay for claiming slot 5 is 0
Delay for claiming slot 6 is 0
Delay for claiming slot 7 is 0
Received message with sequence 0. EndOfBatch = true
Delay for claiming slot 8 is 505
Received message with sequence 1. EndOfBatch = false
Received message with sequence 2. EndOfBatch = false
Received message with sequence 3. EndOfBatch = false
Received message with sequence 4. EndOfBatch = false
Received message with sequence 5. EndOfBatch = false
Received message with sequence 6. EndOfBatch = false
Received message with sequence 7. EndOfBatch = true
Delay for claiming slot 9 is 3519
Received message with sequence 8. EndOfBatch = true
Received message with sequence 9. EndOfBatch = true

ただし、500 ミリ秒の待機時間を削除すると、次のようになります。

Delay for claiming slot 0 is 0
Delay for claiming slot 1 is 0
Received message with sequence 0. EndOfBatch = true
Received message with sequence 1. EndOfBatch = true
Delay for claiming slot 2 is 0
Received message with sequence 2. EndOfBatch = true
Delay for claiming slot 3 is 0
Received message with sequence 3. EndOfBatch = true
Delay for claiming slot 4 is 0
Received message with sequence 4. EndOfBatch = true
Delay for claiming slot 5 is 0
Received message with sequence 5. EndOfBatch = true
Delay for claiming slot 6 is 0
Received message with sequence 6. EndOfBatch = true
Delay for claiming slot 7 is 0
Received message with sequence 7. EndOfBatch = true
Delay for claiming slot 8 is 1
Delay for claiming slot 9 is 0
Received message with sequence 8. EndOfBatch = false
Received message with sequence 9. EndOfBatch = true  

したがって、特定のメッセージがバッチの最後にあると見なされるかどうか (つまり、バッチのサイズ) は、コンシューマーのメッセージ処理の遅延に影響されているようです。ここで私はばかげているかもしれませんが、それはどうあるべきですか?その背後にある理由は何ですか?とにかく、一般的にバッチサイズを決定するものは何ですか? 前もって感謝します。私の質問で不明な点があれば教えてください。

4

1 に答える 1

8

バッチ サイズは、利用可能な要素の数によってのみ決定されます。したがって、その時点で利用可能な要素がさらにある場合は、それがバッチに含まれます。たとえば、Disruptor がコードを呼び出し、キューに要素が 1 つしかない場合、endOfBatch=true で 1 つの呼び出しを取得します。キューに 8 つの要素がある場合、8 つすべてを収集して 1 つのバッチで送信します。

以下のコードで、キュー内の「使用可能な」エントリの数が取得され、「次の」アイテムよりも多くなる可能性があることがわかります。たとえば、現在 5 で、スロット 6 を待っているときに 3 つのイベントが到着し、利用可能なイベントが 8 になり、バッチで複数の呼び出し (6、7、8) を受信するとします。

https://github.com/LMAX-Exchange/disruptor/blob/master/src/main/java/com/lmax/disruptor/BatchEventProcessor.java#L124

final long availableSequence = sequenceBarrier.waitFor(nextSequence);
while (nextSequence <= availableSequence)
{
    event = dataProvider.get(nextSequence);
    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
    nextSequence++;
}

要素 9 での 500 ミリ秒の一時停止に関しては、Disruptor がリング バッファーで構築されており、バッファー内のスロット数を 8 に指定していることに注意してください (ここで 2 番目のパラメーターを参照してください)。

bus1 = new Disruptor<Test1>(new Test1Factory(), 8, test1Workers);  

すべてのコンシューマーが要素を消費したわけではなく、リングバッファーが容量に達している (8 つの要素すべてがいっぱい) 場合、プロデューサーは新しいイベントをバッファーにポストすることをブロックされます。バッファー サイズを 200 万オブジェクトに増やすか、コンシューマーがプロデューサーよりも高速であることを確認して、キューがいっぱいにならないようにすることができます (既に実証したスリープを削除します)。

于 2015-11-15T06:27:02.747 に答える