私は最近 LMAX Disruptor について学び、いくつかの実験を行っています。私を困惑させていることの1つは、のハンドラーメソッドのendOfBatch
パラメーターです。次のコードを検討してください。まず、私が呼び出すダミーのメッセージとコンシューマー クラスと:onEvent
EventHandler
Test1
Test1Worker
public class Test1 {
}
public class Test1Worker implements EventHandler<Test1>{
public void onEvent(Test1 event, long sequence, boolean endOfBatch) {
try{
Thread.sleep(500);
}
catch(Exception e){
e.printStackTrace();
}
System.out.println("Received message with sequence " + sequence + ". "
+ "EndOfBatch = " + endOfBatch);
}
}
実際の作業の代わりに 500 ミリ秒の遅延を設定したことに注意してください。また、コンソールにシーケンス番号を出力しています
そして、(プロデューサーとして機能している)ドライバークラスが呼び出されましたDisruptorTest
:
public class DisruptorTest {
private static Disruptor<Test1> bus1;
private static ExecutorService test1Workers;
public static void main(String[] args){
test1Workers = Executors.newFixedThreadPool(1);
bus1 = new Disruptor<Test1>(new Test1Factory(), 8, test1Workers);
bus1.handleEventsWith(new Test1Worker());
RingBuffer<Test1> buf1 = bus1.start();
for (int i = 0; i < 10; i++){
long a = System.currentTimeMillis();
long next = buf1.next();
long b = System.currentTimeMillis();
System.out.println("Delay for claiming slot " + i + " is "+ (b - a));
try {
Test1 message = buf1.get(next);
} catch (Exception e) {
e.printStackTrace();
} finally {
buf1.publish(next);
}
}
}
public static class Test1Factory implements EventFactory<Test1> {
public Test1 newInstance() {
return new Test1();
}
}
}
ここでは、必要なものを初期化した後、10 個のメッセージを (バッファ サイズ 8) にフィードしRingBuffer
、いくつかのことを監視しようとしています - の次のスロットを要求するためのプロデューサーの遅延とRingBuffer
、コンシューマー側、および特定のシーケンスがバッチの最後と見なされているかどうか。
さて、興味深いことに、各メッセージの処理に 500 ミリ秒の遅延が関係しているため、これが出力として得られるものです。
Delay for claiming slot 0 is 0
Delay for claiming slot 1 is 0
Delay for claiming slot 2 is 0
Delay for claiming slot 3 is 0
Delay for claiming slot 4 is 0
Delay for claiming slot 5 is 0
Delay for claiming slot 6 is 0
Delay for claiming slot 7 is 0
Received message with sequence 0. EndOfBatch = true
Delay for claiming slot 8 is 505
Received message with sequence 1. EndOfBatch = false
Received message with sequence 2. EndOfBatch = false
Received message with sequence 3. EndOfBatch = false
Received message with sequence 4. EndOfBatch = false
Received message with sequence 5. EndOfBatch = false
Received message with sequence 6. EndOfBatch = false
Received message with sequence 7. EndOfBatch = true
Delay for claiming slot 9 is 3519
Received message with sequence 8. EndOfBatch = true
Received message with sequence 9. EndOfBatch = true
ただし、500 ミリ秒の待機時間を削除すると、次のようになります。
Delay for claiming slot 0 is 0
Delay for claiming slot 1 is 0
Received message with sequence 0. EndOfBatch = true
Received message with sequence 1. EndOfBatch = true
Delay for claiming slot 2 is 0
Received message with sequence 2. EndOfBatch = true
Delay for claiming slot 3 is 0
Received message with sequence 3. EndOfBatch = true
Delay for claiming slot 4 is 0
Received message with sequence 4. EndOfBatch = true
Delay for claiming slot 5 is 0
Received message with sequence 5. EndOfBatch = true
Delay for claiming slot 6 is 0
Received message with sequence 6. EndOfBatch = true
Delay for claiming slot 7 is 0
Received message with sequence 7. EndOfBatch = true
Delay for claiming slot 8 is 1
Delay for claiming slot 9 is 0
Received message with sequence 8. EndOfBatch = false
Received message with sequence 9. EndOfBatch = true
したがって、特定のメッセージがバッチの最後にあると見なされるかどうか (つまり、バッチのサイズ) は、コンシューマーのメッセージ処理の遅延に影響されているようです。ここで私はばかげているかもしれませんが、それはどうあるべきですか?その背後にある理由は何ですか?とにかく、一般的にバッチサイズを決定するものは何ですか? 前もって感謝します。私の質問で不明な点があれば教えてください。