1

JSON で Kafka との間で生産/消費します。以下のプロパティを使用して、JSON で HDFS に保存します。

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

プロデューサー :

curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \
      --data '{"schema": {"type": "boolean", "optional": false, "name": "bool", "version": 2, "doc": "the documentation", "parameters": {"foo": "bar" }}, "payload": true }' "http://localhost:8082/topics/test_hdfs_json"

消費者 :

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-hdfs/quickstart-hdfs.properties

問題-1:

key.converter.schemas.enable=true

value.converter.schemas.enable=true

例外を取得:

org.apache.kafka.connect.errors.DataException: JsonDeserializer with schemas.enable requires "schema" and "payload" fields and may not contain additional fields
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:332)

問題-2:

上記の 2 つのプロパティを有効にしても問題は発生しませんが、hdfs にデータが書き込まれません。

どんな提案でも大歓迎です。

ありがとう

4

2 に答える 2

2

コンバーターは、データが Kafka トピックから変換され、コネクターによって解釈され、HDFS に書き込まれる方法を指します。HDFS コネクタは、すぐに使用できる avro または parquet での HDFS への書き込みのみをサポートします。形式を JSON に拡張する方法に関する情報は、こちら で確認できます。このような拡張機能を作成する場合は、コネクタのオープン ソース プロジェクトに貢献することをお勧めします。

于 2016-11-24T02:33:50.803 に答える