6

私は Kafka と avro がまったく初めてで、confluent パッケージを使用しようとしています。JPA に使用する既存の POJO があり、各値を汎用レコードに手動で反映することなく、POJO のインスタンスを簡単に生成できるようにしたいと考えています。ドキュメントでこれがどのように行われるかが欠けているようです。

例では汎用レコードを使用し、次のように各値を 1 つずつ設定します。

String key = "key1";
String userSchema = "{\"type\":\"record\"," +
                    "\"name\":\"myrecord\"," +
                    "\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("f1", "value1");

record = new ProducerRecord<Object, Object>("topic1", key, avroRecord);
try {
  producer.send(record);
} catch(SerializationException e) {
  // may need to do something with it
}

クラスからスキーマを取得する例がいくつかあり、必要に応じてそのスキーマを変更するための注釈を見つけました。では、POJO のインスタンスを取得し、それをそのままシリアライザーに送信して、クラスからスキーマを照合し、値を汎用レコードにコピーする作業をライブラリに行わせるにはどうすればよいでしょうか? 私はこれについてすべて間違っていますか?私がやりたいことは次のようなものです:

String key = "key1";
Schema schema = ReflectData.get().getSchema(myObject.getClass());
GenericRecord avroRecord = ReflectData.get().getRecord(myObject, schema);

record = new ProducerRecord<Object, Object>("topic1", key, avroRecord);
try {
  producer.send(record);
} catch(SerializationException e) {
  // may need to do something with it
}

ありがとう!

4

1 に答える 1

1

このインスタンスで独自のシリアライザーを作成することになりました。

public class KafkaAvroReflectionSerializer extends KafkaAvroSerializer {
   private final EncoderFactory encoderFactory = EncoderFactory.get();

   @Override
   protected byte[] serializeImpl(String subject, Object object) throws SerializationException {
      //TODO: consider caching schemas
      Schema schema = null;

      if(object == null) {
         return null;
      } else {
         try {
            schema = ReflectData.get().getSchema(object.getClass());
            int e = this.schemaRegistry.register(subject, schema);
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            out.write(0);
            out.write(ByteBuffer.allocate(4).putInt(e).array());

            BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null);
            DatumWriter<Object> writer = new ReflectDatumWriter<>(schema);
            writer.write(object, encoder);
            encoder.flush();
            out.close();

            byte[] bytes = out.toByteArray();
            return bytes;
         } catch (IOException ioe) {
            throw new SerializationException("Error serializing Avro message", ioe);
         } catch (RestClientException rce) {
            throw new SerializationException("Error registering Avro schema: " + schema, rce);
         } catch (RuntimeException re) {
            throw new SerializationException("Error serializing Avro message", re);
         }
      }
   }
}
于 2016-01-07T00:09:03.773 に答える