37

Zookeeper を使用してカフカからデータを取得しています。ここでは、常に最後のオフセット ポイントからデータを取得します。古いデータを取得するためにオフセットの時間を指定する方法はありますか?

1 つのオプション autooffset.reset があります。最小または最大を受け入れます。最小と最大の意味を誰か教えてください。autooffset.reset は、最新のオフセット ポイントではなく古いオフセット ポイントからデータを取得するのに役立ちますか?

4

7 に答える 7

23

コンシューマーは常にグループに属し、各パーティションについて、Zookeeper はパーティション内のそのコンシューマー グループの進行状況を追跡します。

最初からフェッチするには、Hussain が参照したように、進行状況に関連付けられているすべてのデータを削除できます。

ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}");

core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala で指定されているように、必要なパーティションのオフセットを指定することもできます。

ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offset.toString)

ただし、オフセットは時間インデックスではありませんが、各パーティションがシーケンスであることはわかっています。

メッセージにタイムスタンプが含まれている場合 (このタイムスタンプは、Kafka がメッセージを受信した時点とは関係がないことに注意してください)、オフセットを N ずつインクリメントすることにより、段階的に 1 つのエントリを取得しようとするインデクサーを試行できます。タプル (トピック X、パート 2、オフセット 100、タイムスタンプ) のどこかに。

特定の時点からエントリを取得する場合は、必要なエントリが見つかるまで大まかなインデックスにバイナリ検索を適用して、そこから取得できます。

于 2013-06-13T10:09:28.277 に答える
1

Kafka Protocol Doc は、リクエスト/レスポンス/オフセット/メッセージで遊ぶのに最適なソースです: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol you use Simple Consumer example次のコードは状態を示しています。

FetchRequest req = new FetchRequestBuilder()

        .clientId(clientName)

        .addFetch(a_topic, a_partition, readOffset, 100000) 

        .build();

FetchResponse fetchResponse = simpleConsumer.fetch(req);

初期オフセットを開始するように readOffset を設定します。ただし、最大オフセットを確認する必要があります。上記と同様に、addFetch メソッドの最後のパラメーターで FetchSize に従って制限されたオフセット カウントが提供されます。

于 2015-09-01T06:47:50.920 に答える