ほとんど似た 2 つのカフカ アプリケーションがあります。どちらも、2 つのテーブルの変更について binlog をリッスンします。私の問題は、そのうちの 1 つが正常に動作することですが、2 つ目を起動しようとすると、次の例外が発生します。
org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"Key","namespace":"mysql.company.payments","fields":[{"name":"id","type":"long"}],"connect.name":"mysql.company.payments.Key"} Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
エラーが指すスキーマ ファイルの内容は次のとおりです。
{
"type": "record",
"name": "Key",
"namespace": "mysql.company.payments",
"fields": [
{
"name": "id",
"type": "long"
}
],
"connect.name": "mysql.company.payments.Key"
}
動作している他のアプリケーションにはまったく同じ avro ファイルがありますが、テーブル (支払い) の名前が置き換えられています。どちらのアプリも同じサーバーから実行され、同じ Kafka クラスターに接続されています。maven プラグインを使用して、avro ファイルに基づいて Java クラスを作成します。クラス Key.class が正常に作成されます。
これらは、私のアプリケーションの 2 つの重要なクラスです。
メインクラス
import com.company.util.Configs;
import error.PaymentSerializationException;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import payment.PaymentUpdateListener;
import java.util.Properties;
public class PaymentsMain {
static Properties properties;
public static void main(String[] args) {
StreamsBuilder builder = new StreamsBuilder();
properties = configProperties();
StreamsBuilder streamsBuilder = watchForPaymentUpdate(builder);
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
kafkaStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
}
private static StreamsBuilder watchForPaymentUpdate(StreamsBuilder builder){
PaymentUpdateListener paymentUpdateListener = new PaymentUpdateListener(builder);
paymentUpdateListener.start();
return builder;
}
private static Properties configProperties(){
Properties streamProperties = new Properties();
streamProperties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, Configs.getConfig("schemaRegistryUrl"));
streamProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "payment-kafka");
streamProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Configs.getConfig("bootstrapServerUrl"));
streamProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
streamProperties.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/state_dir");
streamProperties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "3");
streamProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
streamProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
streamProperties.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
streamProperties.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
PaymentSerializationException.class);
return streamProperties;
}
}
ストリーム クラス
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
public class PaymentUpdateListener {
private StreamsBuilder builder;
public PaymentUpdateListener(StreamsBuilder builder) {
this.builder = builder;
}
public void start(){
builder.stream("mysql.company.payments",
Consumed.with(PaymentSerde.getGenericKeySerde(), PaymentSerde.getEnvelopeSerde()))
.to("kafka-consumer.payment");
}
}