Kafka-Connect を使用して Kafka-Elasticsearch コネクタを実装しています。
プロデューサーは複雑な JSON を Kafka トピックに送信し、コネクタ コードはこれを使用して Elastic Search に永続化します。コネクタは構造体 ( https://kafka.apache.org/0100/javadoc/org/apache/kafka/connect/data/Struct.html )の形式でデータを取得します。
トップレベルの Json で構造体のフィールド値を取得できますが、ネストされた json からフェッチすることはできません。
{
"after": {
"test.test.employee.Value": {
"id": 5671111,
"name": {
"string": "abc"
}
}
},
"op": "u",
"ts_ms": {
"long": 1474892835943
}
}
「op」は解析できますが、「test.test.employee.Value」は解析できません。
Struct afterStruct = struct.getStruct("after"); // giving me proper value.
String opValue = struct.getString("op"); // giving me proper value of "u".
Struct valueStruct = afterStruct .getStruct("test.test.employee.Value"); // org.apache.kafka.connect.errors.DataException: test.test.employee.Value is not a valid field name