以下のユーティリティを使用して、統合テストの実行後にクリーンアップします。
最新のAdminZkClient
APIを使用しています。古い API は廃止されました。
import javax.inject.Inject
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.utils.Time
class ZookeeperUtils @Inject() (config: AppConfig) {
val testTopic = "users_1"
val zkHost = config.KafkaConfig.zkHost
val sessionTimeoutMs = 10 * 1000
val connectionTimeoutMs = 60 * 1000
val isSecure = false
val maxInFlightRequests = 10
val time: Time = Time.SYSTEM
def cleanupTopic(config: AppConfig) = {
val zkClient = KafkaZkClient.apply(zkHost, isSecure, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, time)
val zkUtils = new AdminZkClient(zkClient)
val pp = new Properties()
pp.setProperty("delete.retention.ms", "10")
pp.setProperty("file.delete.delay.ms", "1000")
zkUtils.changeTopicConfig(testTopic , pp)
// zkUtils.deleteTopic(testTopic)
println("Waiting for topic to be purged. Then reset to retain records for the run")
Thread.sleep(60000L)
val resetProps = new Properties()
resetProps.setProperty("delete.retention.ms", "3000000")
resetProps.setProperty("file.delete.delay.ms", "4000000")
zkUtils.changeTopicConfig(testTopic , resetProps)
}
}
オプションのトピックの削除があります。ただし、トピックを削除対象としてマークします。Zookeeper は後でトピックを削除します。これは予想外に長くなる可能性があるため、retention.ms アプローチを好みます。