Parquet ファイルを動的に読み取り、一意のレコードを抽出する必要があります。各ファイルには、1 つ以上のキー列を含めることができます。
集計変換では、
ID
列ごと にグループ化してい ます また、他のすべての列をパススルーできるようにします 注: 列がAddressID ではなくとして読み取られていることを確認してくださいID
select の次のステップで、この ID の名前を AddressID (パラメーター値を使用) に変更しようとしています。 このような出力ショー
ハードコードされた値 (アドレス ID) として名前に値を指定しようとしましたが、機能します。
この ID の名前を AddressId (キー列名のパラメータ値) で動的に変更する方法について、私を助けることができますか?
また、上記のシナリオは、キー列が 1 つある場合に発生する可能性があります。Azure Data Factory を使用して、複数のキー列があるシナリオを処理し、動的に処理することはできますか?
これに応じて、adf を使用するか、ADB を使用します。
データ フロー コード:
{
"name": "RemoveDuplicateRows",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "DS_Parquet_DF",
"type": "DatasetReference"
},
"name": "source1"
}
],
"sinks": [
{
"dataset": {
"referenceName": "DS_Parquet_Cleaned",
"type": "DatasetReference"
},
"name": "sink1"
}
],
"transformations": [
{
"name": "Aggregate1"
},
{
"name": "Select1"
}
],
"script": "parameters{\n\tID as string ('AddressID')\n}\nsource(allowSchemaDrift: true,\n\tvalidateSchema: false,\n\tformat: 'parquet',\n\tpartitionBy('roundRobin', 2)) ~> source1\nsource1 aggregate(groupBy(ID = byName($ID)),\n\teach(match(name!=$ID), $$ = first($$))) ~> Aggregate1\nAggregate1 select(mapColumn(\n\t\teach(match(name=='ID'),\n\t\t\t'AddressID' = $$),\n\t\teach(match(name!='ID'))\n\t),\n\tskipDuplicateMapInputs: true,\n\tskipDuplicateMapOutputs: true) ~> Select1\nSelect1 sink(allowSchemaDrift: true,\n\tvalidateSchema: false,\n\tformat: 'parquet',\n\ttruncate: true,\n\tpartitionBy('roundRobin', 2),\n\tskipDuplicateMapInputs: true,\n\tskipDuplicateMapOutputs: true) ~> sink1"
}
}
}
DataFlow スクリプト
parameters{
ID as string ('AddressID')
}
source(allowSchemaDrift: true,
validateSchema: false,
format: 'parquet',
partitionBy('roundRobin', 2)) ~> source1
source1 aggregate(groupBy(ID = byName($ID)),
each(match(name!=$ID), $$ = first($$))) ~> Aggregate1
Aggregate1 select(mapColumn(
each(match(name=='ID'),
'AddressID' = $$),
each(match(name!='ID'))
),
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> Select1
Select1 sink(allowSchemaDrift: true,
validateSchema: false,
format: 'parquet',
truncate: true,
partitionBy('roundRobin', 2),
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> sink1