3 ノードの kafka クラスターをセットアップしています。私は嵐を使ってカフカからのメッセージを読んでいます。システムの各トピックには 7 つのパーティションがあります。
今、私は奇妙な問題に直面しています。3日前までは、すべて正常に機能していました。ただし、現在、ストーム トポロジが 2 つのパーティション (#1 と #4) から読み取ることができないようです。
問題を掘り下げてみたところ、私の kafka ログで、これらのパーティションの両方で、1 つのオフセットが欠落していることがわかりました。つまり、5964511 の後に、次のオフセットは 5964512 ではなく 5964513 です。
オフセットが見つからないため、Simple Consumer は次のオフセットに進むことができません。私は何か間違ったことをしていますか、それとも既知のバグですか?
そのような行動の理由は何でしょうか?
次のコードを使用して、有効なオフセットのウィンドウを読み取ります。
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfoMap = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 100));
OffsetRequest request = new OffsetRequest( requestInfoMap, kafka.api.OffsetRequest.CurrentVersion() , clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
long[] validOffsets = response.offsets(topic, partition);
for (long validOffset : validOffsets) {
System.out.println(validOffset + " : ");
}
long largestOffset = validOffsets[0];
long smallestOffset = validOffsets[validOffsets.length - 1];
System.out.println(smallestOffset + " : " + largestOffset );
return largestOffset;
}
これにより、次の出力が得られます。
4529948 : 6000878
したがって、私が提供しているオフセットは十分にオフセット範囲内にあります。