次のコードを使用しています (実際にはそうではありませんが、仮定しましょう) スキーマを作成し、プロデューサーによって kafka に送信します。
public static final String USER_SCHEMA = "{"
+ "\"type\":\"record\","
+ "\"name\":\"myrecord\","
+ "\"fields\":["
+ " { \"name\":\"str1\", \"type\":\"string\" },"
+ " { \"name\":\"str2\", \"type\":\"string\" },"
+ " { \"name\":\"int1\", \"type\":\"int\" }"
+ "]}";
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
for (int i = 0; i < 1000; i++) {
GenericData.Record avroRecord = new GenericData.Record(schema);
avroRecord.put("str1", "Str 1-" + i);
avroRecord.put("str2", "Str 2-" + i);
avroRecord.put("int1", i);
byte[] bytes = recordInjection.apply(avroRecord);
ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
producer.send(record);
Thread.sleep(250);
}
producer.close();
}
問題は、コードにより、このスキーマで 1 つのメッセージしか送信できないことです。次に、次のメッセージを送信するためにスキーマ名を変更する必要があります...そのため、名前文字列は現在ランダムに生成されているため、さらにメッセージを送信できます。これはハックなので、これを行う適切な方法を知りたいです。
また、スキーマなしでメッセージを送信する方法も調べました (つまり、スキーマを含む 1 つのメッセージを既に kafka に送信したので、他のすべてのメッセージはもうスキーマを必要としません) - ただしnew GenericData.Record(..)
、スキーマ パラメーターが必要です。null の場合、エラーがスローされます。
では、avro スキーマ メッセージを kafka に送信する正しい方法は何ですか?
これは別のコードサンプルです - 私のものとかなり同じです
: /confluent/examples/producer/ProducerExample.java
また、スキーマを設定せずに送信する方法も示していません。