次のスキーマを持つ PostgreSQL のテーブルがあります。
Table "public.kc_ds"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
--------+-----------------------+-----------+----------+-----------------------------------+----------+--------------+-------------
id | integer | | not null | nextval('kc_ds_id_seq'::regclass) | plain | |
num | integer | | not null | | plain | |
text | character varying(50) | | not null | | extended | |
Indexes:
"kc_ds_pkey" PRIMARY KEY, btree (id)
Publications:
"dbz_publication"
スキーマ レジストリを使用するこのテーブルに対して Debezium ソース コネクタを実行するio.confluent.connect.avro.AvroConverter
と、次のようなスキーマ レジストリ スキーマが作成されます (一部のフィールドはここでは省略されています)。
"fields":[
{
"name":"before",
"type":[
"null",
{
"type":"record",
"name":"Value",
"fields":[
{
"name":"id",
"type":"int"
},
{
"name":"num",
"type":"int"
},
{
"name":"text",
"type":"string"
}
],
"connect.name":"xxx.public.kc_ds.Value"
}
],
"default":null
},
{
"name":"after",
"type":[
"null",
"Value"
],
"default":null
},
]
Debezium によって生成される私の Kafka トピックのメッセージは次のようになります (一部のフィールドは省略されています)。
{
"before": null,
"after": {
"xxx.public.kc_ds.Value": {
"id": 2,
"num": 2,
"text": "text version 1"
}
}
INSERT または UPDATE の場合、"before"
常にnull
であり"after"
、データが含まれています。DELETE を実行すると、逆が成り立ち、"after"
null であり"before"
、データが含まれます (ただし、すべてのフィールドはデフォルト値に設定されます)。
質問 #1:"before"
Kafka Connect がフィールドとフィールドを使用してスキーマを作成するのはなぜ"after"
ですか? なぜこれらのフィールドはこのように奇妙な振る舞いをするのでしょうか?
質問 2:スキーマ レジストリを使用しながら、Kafka Connect がフラットメッセージをトピックに送信する組み込みの方法はありますか? Flatten変換は私が必要としているものではないことに注意してください: 有効にする"before"
と、フィールドとフィールドが残り"after"
ます。
質問 #3 (実際には何も望んでいませんが、誰かが知っているかもしれません):メッセージを平坦化する必要があるのは、 HudiDeltaStreamerを使用してトピックからデータを読み取る必要があるという事実から来ており、このツールは平坦な入力データを想定しているようです。"before"
およびフィールドは、最終的に結果の.parquetファイル"after"
内の個別のオブジェクトのような列になります。HudiDeltaStreamer が Kafka Connect によって生成されたメッセージとどのように統合されるのか、誰にもわかりませんか?