5

3 ノードの kafka クラスターをセットアップしています。私は嵐を使ってカフカからのメッセージを読んでいます。システムの各トピックには 7 つのパーティションがあります。

今、私は奇妙な問題に直面しています。3日前までは、すべて正常に機能していました。ただし、現在、ストーム トポロジが 2 つのパーティション (#1 と #4) から読み取ることができないようです。

問題を掘り下げてみたところ、私の kafka ログで、これらのパーティションの両方で、1 つのオフセットが欠落していることがわかりました。つまり、5964511 の後に、次のオフセットは 5964512 ではなく 5964513 です。

オフセットが見つからないため、Simple Consumer は次のオフセットに進むことができません。私は何か間違ったことをしていますか、それとも既知のバグですか?

そのような行動の理由は何でしょうか?

次のコードを使用して、有効なオフセットのウィンドウを読み取ります。

public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                 long whichTime, String clientName) {
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfoMap = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 100));
    OffsetRequest request = new OffsetRequest( requestInfoMap, kafka.api.OffsetRequest.CurrentVersion() , clientName);
    OffsetResponse response = consumer.getOffsetsBefore(request);
    long[] validOffsets = response.offsets(topic, partition);
    for (long validOffset : validOffsets) {
        System.out.println(validOffset + " : ");
    }
    long largestOffset = validOffsets[0];
    long smallestOffset = validOffsets[validOffsets.length - 1];
    System.out.println(smallestOffset + " : " + largestOffset );
    return largestOffset;
}

これにより、次の出力が得られます。

4529948 : 6000878

したがって、私が提供しているオフセットは十分にオフセット範囲内にあります。

4

1 に答える 1

1

返事が遅くなり申し訳ありませんが...

この場合、次に読み取るオフセットを保持する Long インスタンス var を用意し、フェッチ後に、返された FetchResponse hasError() があるかどうかを確認することでコーディングします。エラーが発生した場合は、次のオフセット値を適切な値 (次のオフセットまたは最後に使用可能なオフセットである可能性があります) に変更して、再試行します。

于 2014-12-15T17:25:03.050 に答える