私はKafkaProducer
テストケースで使用しており、プロデューサーは のschemaRegistryUrl
ローカルインスタンスを指す を使用していますSchema Registry
。KafkaProducer
スキーマ レジストリとの接続方法をモックする方法はありますか? つまりKafkaProducer/Consumer
、スキーマ レジストリのインスタンスを実行せずにテストで動作させることです。
11387 次
3 に答える
12
絶対あります。KafkaAvroSerializer と KafkaAvroDeserializer の両方に、SchemaRegistryClient を受け取るコンストラクターがあります。MockSchemaRegistryClient を SchemaRegistryClient として使用できます。これを行う方法を示すコード スニペットを次に示します。
private MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient();
private String registryUrl = "unused";
public <T> Serde<T> getAvroSerde(boolean isKey) {
return Serdes.serdeFrom(getSerializer(isKey), getDeserializer(isKey));
}
private <T> Serializer<T> getSerializer(boolean isKey) {
Map<String, Object> map = new HashMap<>();
map.put(KafkaAvroDeserializerConfig.AUTO_REGISTER_SCHEMAS, true);
map.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
Serializer<T> serializer = (Serializer) new KafkaAvroSerializer(mockSchemaRegistryClient);
serializer.configure(map, isKey);
return serializer;
}
private <T> Deserializer<T> getDeserializer(boolean key) {
Map<String, Object> map = new HashMap<>();
map.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
map.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
Deserializer<T> deserializer = (Deserializer) new KafkaAvroDeserializer(mockSchemaRegistryClient);
deserializer.configure(map, key);
return deserializer;
}
于 2018-05-02T18:45:23.197 に答える