0

Parquet ファイルを動的に読み取り、一意のレコードを抽出する必要があります。各ファイルには、1 つ以上のキー列を含めることができます。

  1. IDファイルに 1 つのキー列があると仮定して、パラメーター を使用して以下のデータ フローを設計しました。基本的なデータの流れ

  2. 集計変換では、ID列ごと にグループ化していパラメーター化された ID 列によるグループ化 ます また、他のすべての列をパススルーできるようにします : 列がAddressID ではなくとして読み取られていることを確認してください残りの列を通過させる ID

  3. select の次のステップで、この ID の名前を AddressID (パラメーター値を使用) に変更しようとしています。 ID 列の名前を元のキー列名に変更する このような出力ショー 動作しません


ハードコードされた値 (アドレス ID) として名前に値を指定しようとしましたが、機能します。


この ID の名前を AddressId (キー列名のパラメータ値) で動的に変更する方法について、私を助けることができますか?

また、上記のシナリオは、キー列が 1 つある場合に発生する可能性があります。Azure Data Factory を使用して、複数のキー列があるシナリオを処理し、動的に処理することはできますか?

これに応じて、adf を使用するか、ADB を使用します。

データ フロー コード:

{
    "name": "RemoveDuplicateRows",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "DS_Parquet_DF",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "DS_Parquet_Cleaned",
                        "type": "DatasetReference"
                    },
                    "name": "sink1"
                }
            ],
            "transformations": [
                {
                    "name": "Aggregate1"
                },
                {
                    "name": "Select1"
                }
            ],
            "script": "parameters{\n\tID as string ('AddressID')\n}\nsource(allowSchemaDrift: true,\n\tvalidateSchema: false,\n\tformat: 'parquet',\n\tpartitionBy('roundRobin', 2)) ~> source1\nsource1 aggregate(groupBy(ID = byName($ID)),\n\teach(match(name!=$ID), $$ = first($$))) ~> Aggregate1\nAggregate1 select(mapColumn(\n\t\teach(match(name=='ID'),\n\t\t\t'AddressID' = $$),\n\t\teach(match(name!='ID'))\n\t),\n\tskipDuplicateMapInputs: true,\n\tskipDuplicateMapOutputs: true) ~> Select1\nSelect1 sink(allowSchemaDrift: true,\n\tvalidateSchema: false,\n\tformat: 'parquet',\n\ttruncate: true,\n\tpartitionBy('roundRobin', 2),\n\tskipDuplicateMapInputs: true,\n\tskipDuplicateMapOutputs: true) ~> sink1"
        }
    }
}

DataFlow スクリプト

parameters{
    ID as string ('AddressID')
}
source(allowSchemaDrift: true,
    validateSchema: false,
    format: 'parquet',
    partitionBy('roundRobin', 2)) ~> source1
source1 aggregate(groupBy(ID = byName($ID)),
    each(match(name!=$ID), $$ = first($$))) ~> Aggregate1
Aggregate1 select(mapColumn(
        each(match(name=='ID'),
            'AddressID' = $$),
        each(match(name!='ID'))
    ),
    skipDuplicateMapInputs: true,
    skipDuplicateMapOutputs: true) ~> Select1
Select1 sink(allowSchemaDrift: true,
    validateSchema: false,
    format: 'parquet',
    truncate: true,
    partitionBy('roundRobin', 2),
    skipDuplicateMapInputs: true,
    skipDuplicateMapOutputs: true) ~> sink1
4

0 に答える 0