0

次のスキーマを持つ PostgreSQL のテーブルがあります。

                                                       Table "public.kc_ds"
 Column |         Type          | Collation | Nullable |              Default              | Storage  | Stats target | Description
--------+-----------------------+-----------+----------+-----------------------------------+----------+--------------+-------------
 id     | integer               |           | not null | nextval('kc_ds_id_seq'::regclass) | plain    |              |
 num    | integer               |           | not null |                                   | plain    |              |
 text   | character varying(50) |           | not null |                                   | extended |              |
Indexes:
    "kc_ds_pkey" PRIMARY KEY, btree (id)
Publications:
    "dbz_publication"

スキーマ レジストリを使用するこのテーブルに対して Debezium ソース コネクタを実行するio.confluent.connect.avro.AvroConverterと、次のようなスキーマ レジストリ スキーマが作成されます (一部のフィールドはここでは省略されています)。

"fields":[
      {
         "name":"before",
         "type":[
            "null",
            {
               "type":"record",
               "name":"Value",
               "fields":[
                  {
                     "name":"id",
                     "type":"int"
                  },
                  {
                     "name":"num",
                     "type":"int"
                  },
                  {
                     "name":"text",
                     "type":"string"
                  }
               ],
               "connect.name":"xxx.public.kc_ds.Value"
            }
         ],
         "default":null
      },
      {
         "name":"after",
         "type":[
            "null",
            "Value"
         ],
         "default":null
      },
]

Debezium によって生成される私の Kafka トピックのメッセージは次のようになります (一部のフィールドは省略されています)。

{
  "before": null,
  "after": {
    "xxx.public.kc_ds.Value": {
      "id": 2,
      "num": 2,
      "text": "text version 1"
    }
}

INSERT または UPDATE の場合、"before"常にnullであり"after"、データが含まれています。DELETE を実行すると、逆が成り立ち、"after"null であり"before"、データが含まれます (ただし、すべてのフィールドはデフォルト値に設定されます)。

質問 #1:"before" Kafka Connect がフィールドとフィールドを使用してスキーマを作成するのはなぜ"after"ですか? なぜこれらのフィールドはこのように奇妙な振る舞いをするのでしょうか?

質問 2:スキーマ レジストリを使用しながら、Kafka Connect がフラットメッセージをトピックに送信する組み込みの方法はありますか? Flatten変換は私が必要としているものではないことに注意してください: 有効にする"before"と、フィールドとフィールドが残り"after"ます。

質問 #3 (実際には何も望んでいませんが、誰かが知っているかもしれません):メッセージを平坦化する必要があるのは、 HudiDeltaStreamerを使用してトピックからデータを読み取る必要があるという事実から来ており、このツールは平坦な入力データを想定しているようです。"before"およびフィールドは、最終的に結果の.parquetファイル"after"内の個別のオブジェクトのような列になります。HudiDeltaStreamer が Kafka Connect によって生成されたメッセージとどのように統合されるのか、誰にもわかりませんか?

4

0 に答える 0