0

ソース コネクタが構造化されたレコードをテキスト ファイルから読み取り、JSON 形式 (スキーマを使用) でトピックに格納する Kafka 接続セットアップを実行しています。これらのメッセージを Cassandra テーブルに挿入するシンク コネクタが実行されています。このセットアップは正常に実行されていますが、これらのメッセージを HDFS にも転送するために別のシンク コネクタを導入する必要がありました。そこで、HDFSSinkConnector (CP 3.0) を実装してみました。ただし、このコネクタは、メッセージが AVRO 形式であると想定しているため、「データを Avro に逆シリアル化できませんでした」などのエラーがスローされます。

JSON メッセージをソース トピックから Avro 形式の別のトピックにコピーして変換し、HDFS シンク コネクタが新しいトピックを読み取れるようにする方法はありますか? Kafka Streams を使用して実行できますか?

分散した Connect Config ファイルには --

...
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
...

トピックの私のメッセージは次のとおりです-

{"schema":{"type":"struct",
           "fields":[{"type":"string","optional":false,"field":"id"},
                     {"type":"string","optional":false,"field":"name"},
                     {"type":"integer","optional":false,"field":"amount"}
                   ],
       "optional":false,
       "name":"myrec",
       "version":1
      },
 "payload":{"id":"A123","name":"Sample","amount":75}
}

誰でもこれについて私を助けることができますか? 事前に感謝...

4

0 に答える 0