0

Qubole の S3 シンクを使用して、Avro データを S3 に Parquet 形式でロードしています。

Java アプリケーションでプロデューサーを作成します

Properties props = new Properties();
props.put("bootstrap.servers", KafkaHelper.getServers());
props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
return new KafkaProducer<byte[], byte[]>(props);

次に、 aを次の形式に変換GenericRecordします。byte[]

GenericRecord avroRecord = new GenericData.Record(avroSchema);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(avroSchema);

for (Map.Entry<String, ?> entry : map.entrySet()) {
    String key = entry.getKey();
    Object value = entry.getValue();
    avroRecord.put(key, value);
}

ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, recordInjection.apply(avroRecord));
producer.send(record);

Kafka Connect プロパティで次の値を使用します。

key.converter=com.qubole.streamx.ByteArrayConverter
value.converter=com.qubole.streamx.ByteArrayConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

そして、ファイル シンク プロパティの次の構成オプション:

connector.class=com.qubole.streamx.s3.S3SinkConnector
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat

コネクタを実行すると、「java.lang.IllegalArgumentException: Avro スキーマはレコードである必要があります」というエラー メッセージが表示されます。

私はKafka Connectを初めて使用し、スキーマレジストリサーバーをセットアップできることを知っていますが、シンクがAvroデータをParquetに変換するためにレジストリを必要とするかどうか、またはこれが何らかの種類のものであるかどうかはわかりません私の側のフォーマットまたは構成の問題。このエラーのコンテキストで「レコード」とはどのようなデータ形式を指しますか? どんな指示や助けも大歓迎です。

4

1 に答える 1