カフカでトピックをパージする方法はありますか?
大きすぎるメッセージをローカル マシンの kafka メッセージ トピックにプッシュしたところ、エラーが発生しました。
kafka.common.InvalidMessageSizeException: invalid message size
ここで を大きくするのfetch.size
は理想的ではありません。実際にはそれほど大きなメッセージを受け入れたくないからです。
カフカでトピックをパージする方法はありますか?
大きすぎるメッセージをローカル マシンの kafka メッセージ トピックにプッシュしたところ、エラーが発生しました。
kafka.common.InvalidMessageSizeException: invalid message size
ここで を大きくするのfetch.size
は理想的ではありません。実際にはそれほど大きなメッセージを受け入れたくないからです。
トピックの保持時間を一時的に 1 秒に更新します。
kafka-topics.sh \
--zookeeper <zkhost>:2181 \
--alter \
--topic <topic name> \
--config retention.ms=1000
また、新しい Kafka リリースでは、次の方法でも実行できます。kafka-configs --entity-type topics
kafka-configs.sh \
--zookeeper <zkhost>:2181 \
--entity-type topics \
--alter \
--entity-name <topic name> \
--add-config retention.ms=1000
次に、パージが有効になるまで待ちます (期間はトピックのサイズによって異なります)。消去したら、以前のretention.ms
値を復元します。
キューをパージするには、トピックを削除できます。
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
次に再作成します。
bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 --topic test
受け入れられた答えは正しいですが、その方法は廃止されました。トピックの設定は、 を介して行う必要がありますkafka-configs
。
kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic
この方法で設定された構成は、次のコマンドで表示できます
kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic
クイックスタートの例として、Kafka 0.8.2 でテスト済み: まず、config フォルダーの下の server.properties ファイルに 1 行を追加します。
delete.topic.enable=true
次に、次のコマンドを実行できます。
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
次に、クライアントが空のトピックに対して操作を続行できるように、それを再作成します
更新: この回答は Kafka 0.6 に関連しています。Kafka 0.8 以降については、@Patrick による回答を参照してください。
はい、kafka を停止し、対応するサブディレクトリからすべてのファイルを手動で削除します (kafka データ ディレクトリで簡単に見つけることができます)。kafka の再起動後、トピックは空になります。
場合によっては、クラスターが飽和している場合 (パーティションが多すぎる、暗号化されたトピック データを使用している、SSL を使用している、コントローラーが不良ノードにある、または接続が不安定な場合、そのトピックをパージするのに長い時間がかかることがあります)。 .
特に Avro を使用している場合は、次の手順に従います。
1: kafka ツールで実行:
kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>
2: 実行:
kafka-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning
3: トピックが空になったら、トピックの保持を元の設定に戻します。
kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>
簡単に宣伝できないので、これが誰かに役立つことを願っています。
Thomas のアドバイスは素晴らしいものですが、残念ながらzkCli
古いバージョンの Zookeeper (たとえば 3.3.6) ではrmr
. たとえば、最新の Zookeeperのコマンド ライン実装をバージョン 3.3と比較してください。
Zookeeper の古いバージョンに直面している場合、解決策の 1 つは、Python 用のzc.zkなどのクライアント ライブラリを使用することです。Python に慣れていない人は、 pipまたはeasy_installを使用してインストールする必要があります。次に、Python シェル ( python
) を開始すると、次のことができます。
import zc.zk
zk = zc.zk.ZooKeeper('localhost:2181')
zk.delete_recursive('brokers/MyTopic')
あるいは
zk.delete_recursive('brokers')
Kafka からすべてのトピックを削除する場合。
最も簡単な方法は、個々のログ ファイルの日付を保存期間よりも古い日付に設定することです。その後、ブローカーはそれらをクリーンアップし、数秒以内に削除します。これにはいくつかの利点があります。
Kafka 0.7.x での私の経験では、ログ ファイルを削除してブローカーを再起動すると、特定のコンシューマーに対して無効なオフセット例外が発生する可能性がありました。これは、ブローカが (既存のログ ファイルがない場合に) ゼロでオフセットを再起動し、以前にトピックから消費していたコンシューマーが再接続して特定の [有効になった] オフセットを要求するために発生します。このオフセットがたまたま新しいトピック ログの範囲外にある場合、害はなく、コンシューマーは最初または最後から再開します。ただし、オフセットが新しいトピック ログの範囲内にある場合、ブローカはメッセージ セットの取得を試みますが、オフセットが実際のメッセージと一致しないため失敗します。
これは、そのトピックの Zookeeper の消費者オフセットもクリアすることで軽減できます。しかし、未使用のトピックが必要なく、既存のコンテンツを削除したいだけの場合は、ブローカを停止し、トピック ログを削除し、特定の Zookeeper ノードをクリアするよりも、いくつかのトピック ログを単に「タッチ」する方がはるかに簡単で信頼性が高くなります。 .
保持.ms と保持.bytes を更新する以外に、トピックのクリーンアップ ポリシーは「削除」(デフォルト) である必要があることに気付きました。「コンパクト」の場合、メッセージが長く保持されます。delete.retention.msも指定します。
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1
また、これが正常に発生したことを確認するために、最も古い/最新のオフセットを監視する必要があり、du -h /tmp/kafka-logs/test-topic-3-100-* も確認できます。
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}' 26599762
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}' 26599762
もう1つの問題は、最初に現在の構成を取得する必要があるため、削除が成功した後に元に戻すことを忘れないでください:
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
アプリケーション グループを使用して特定のトピックからのすべてのメッセージをクリーンアップするには (GroupName はアプリケーションの kafka グループ名と同じである必要があります)。
./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group