2

次のコードを使用しています (実際にはそうではありませんが、仮定しましょう) スキーマを作成し、プロデューサーによって kafka に送信します。

public static final String USER_SCHEMA = "{"
        + "\"type\":\"record\","
        + "\"name\":\"myrecord\","
        + "\"fields\":["
        + "  { \"name\":\"str1\", \"type\":\"string\" },"
        + "  { \"name\":\"str2\", \"type\":\"string\" },"
        + "  { \"name\":\"int1\", \"type\":\"int\" }"
        + "]}";

public static void main(String[] args) throws InterruptedException {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

    Schema.Parser parser = new Schema.Parser();
    Schema schema = parser.parse(USER_SCHEMA);
    Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

    KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);

    for (int i = 0; i < 1000; i++) {
        GenericData.Record avroRecord = new GenericData.Record(schema);
        avroRecord.put("str1", "Str 1-" + i);
        avroRecord.put("str2", "Str 2-" + i);
        avroRecord.put("int1", i);

        byte[] bytes = recordInjection.apply(avroRecord);

        ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
        producer.send(record);

        Thread.sleep(250);

    }

    producer.close();
}

問題は、コードにより、このスキーマで 1 つのメッセージしか送信できないことです。次に、次のメッセージを送信するためにスキーマ名を変更する必要があります...そのため、名前文字列は現在ランダムに生成されているため、さらにメッセージを送信できます。これはハックなので、これを行う適切な方法を知りたいです。

また、スキーマなしでメッセージを送信する方法も調べました (つまり、スキーマを含む 1 つのメッセージを既に kafka に送信したので、他のすべてのメッセージはもうスキーマを必要としません) - ただしnew GenericData.Record(..)、スキーマ パラメーターが必要です。null の場合、エラーがスローされます。

では、avro スキーマ メッセージを kafka に送信する正しい方法は何ですか?

これは別のコードサンプルです - 私のものとかなり同じです
: /confluent/examples/producer/ProducerExample.java

また、スキーマを設定せずに送信する方法も示していません。

4

1 に答える 1