同ページより
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
読み取るオフセットを見つけることを示しています
Kafka には、役立つ 2 つの定数が含まれています。kafka.api.OffsetRequest.EarliestTime() は、ログ内のデータの先頭を見つけて、そこからストリーミングを開始します。kafka.api.OffsetRequest.LatestTime() は、新しいメッセージのみをストリーミングします。メッセージは時間の経過とともにログから削除されるため、オフセット 0 が開始オフセットであると想定しないでください。