96

トピックからすべてのデータを削除する方法、またはすべての実行前にトピックを削除する方法はありますか?

プロパティを変更するために KafkaConfig.scala ファイルを変更できlogRetentionHoursますか? 消費者がメッセージを読むとすぐにメッセージを削除する方法はありますか?

プロデューサーを使用してどこかからデータをフェッチし、コンシューマーが消費する特定のトピックにデータを送信しています。実行ごとにそのトピックからすべてのデータを削除できますか? トピックで毎回新しいデータのみが必要です。どうにかしてトピックを再初期化する方法はありますか?

4

14 に答える 14

76

ここで述べたように、Kafka Queue をパージします。

クイックスタートの例として、Kafka 0.8.2 でテスト済み: まず、config フォルダーの下の server.properties ファイルに 1 行を追加します。

delete.topic.enable=true

次に、次のコマンドを実行できます。

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
于 2015-06-14T20:06:04.703 に答える
7

中程度の成功レベルで、他の回答が説明していることをほとんど試しました。私たち (Apache Kafka 0.8.1) で実際に機能したのは、class コマンドです。

sh kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic yourtopic --zookeeper localhost:2181

于 2014-11-20T19:07:44.773 に答える
1

以下のユーティリティを使用して、統合テストの実行後にクリーンアップします。

最新のAdminZkClientAPIを使用しています。古い API は廃止されました。

import javax.inject.Inject
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.utils.Time

class ZookeeperUtils @Inject() (config: AppConfig) {

  val testTopic = "users_1"

  val zkHost = config.KafkaConfig.zkHost
  val sessionTimeoutMs = 10 * 1000
  val connectionTimeoutMs = 60 * 1000
  val isSecure = false
  val maxInFlightRequests = 10
  val time: Time = Time.SYSTEM

  def cleanupTopic(config: AppConfig) = {

    val zkClient = KafkaZkClient.apply(zkHost, isSecure, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, time)
    val zkUtils = new AdminZkClient(zkClient)

    val pp = new Properties()
    pp.setProperty("delete.retention.ms", "10")
    pp.setProperty("file.delete.delay.ms", "1000")
    zkUtils.changeTopicConfig(testTopic , pp)
    //    zkUtils.deleteTopic(testTopic)

    println("Waiting for topic to be purged. Then reset to retain records for the run")
    Thread.sleep(60000L)

    val resetProps = new Properties()
    resetProps.setProperty("delete.retention.ms", "3000000")
    resetProps.setProperty("file.delete.delay.ms", "4000000")
    zkUtils.changeTopicConfig(testTopic , resetProps)

  }


}

オプションのトピックの削除があります。ただし、トピックを削除対象としてマークします。Zookeeper は後でトピックを削除します。これは予想外に長くなる可能性があるため、retention.ms アプローチを好みます。

于 2019-10-01T10:36:01.650 に答える
0

私はこのスクリプトを使用します:

#!/bin/bash
topics=`kafka-topics --list --zookeeper zookeeper:2181`
for t in $topics; do 
    for p in retention.ms retention.bytes segment.ms segment.bytes; do
        kafka-topics --zookeeper zookeeper:2181 --alter --topic $t --config ${p}=100
    done
done
sleep 60
for t in $topics; do 
    for p in retention.ms retention.bytes segment.ms segment.bytes; do
        kafka-topics --zookeeper zookeeper:2181 --alter --topic $t --delete-config ${p}
    done
done
于 2019-06-07T13:06:26.653 に答える