2

ここで単純な消費者について述べたように

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

また、読み取られるオフセットが要求したオフセットより小さくないことを明示的にチェックしていることにも注意してください。Kafka がメッセージを圧縮している場合、要求されたオフセットが圧縮されたブロックの先頭でなくても、フェッチ要求は圧縮されたブロック全体を返すため、これが必要です。したがって、以前に見たメッセージが再び返される可能性があります。

最後に、読み取られたメッセージの数を追跡します。最後のリクエストで何も読み取らなかった場合は、1 秒間スリープ状態になるため、データがないときに Kafka を叩くことはありません。

私のプログラムのように、最初に古いメッセージの 1 つを読み取り、それが古いためスリープ状態になり、次に新しいレコードを読み取ります。

SimpleConsumer が新しいメッセージのみを読み取るようにするための回避策はありますか?

4

1 に答える 1

0

同ページより

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                 long whichTime, String clientName) {
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
    OffsetResponse response = consumer.getOffsetsBefore(request);

    if (response.hasError()) {
        System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
        return 0;
    }
    long[] offsets = response.offsets(topic, partition);
    return offsets[0];
}

読み取るオフセットを見つけることを示しています

Kafka には、役立つ 2 つの定数が含まれています。kafka.api.OffsetRequest.EarliestTime() は、ログ内のデータの先頭を見つけて、そこからストリーミングを開始します。kafka.api.OffsetRequest.LatestTime() は、新しいメッセージのみをストリーミングします。メッセージは時間の経過とともにログから削除されるため、オフセット 0 が開始オフセットであると想定しないでください。

于 2013-08-23T22:27:33.330 に答える