0

AWS Glue と Apache Hudi を使用して、RDS のデータを S3 にレプリケートします。以下のジョブを実行すると、2 つの parquet ファイル (初期のものと更新されたもの) が S3 バケット (basePath) に生成されます。この場合、最新のファイルが 1 つだけ必要で、古いファイルを削除したいと考えています。

バケットに最新のファイルを 1 つ保持する方法を知っている人はいますか?

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(5))
df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.show()

tableName = 'hudi_mor_athena_sample' 
bucketName = 'cm-sato-hudi-sample--datalake'
basePath = f's3://{bucketName}/{tableName}'

hudi_options = {
  'hoodie.table.name': tableName,
  'hoodie.datasource.write.storage.type': 'MERGE_ON_READ',
  'hoodie.compact.inline': 'false',
  'hoodie.datasource.write.recordkey.field': 'uuid', 
  'hoodie.datasource.write.partitionpath.field': 'partitionpath',
  'hoodie.datasource.write.table.name': tableName,
  'hoodie.datasource.write.operation': 'upsert', 
  'hoodie.datasource.write.precombine.field': 'ts', 
  'hoodie.upsert.shuffle.parallelism': 2, 
  'hoodie.insert.shuffle.parallelism': 2,
}

df.write.format("hudi"). \
  options(**hudi_options). \
  mode("overwrite"). \
  save(basePath)

updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(3))
df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.show()

# update
df.write.format("hudi"). \
  options(**hudi_options). \
  mode("append"). \
  save(basePath)

job.commit()
4

1 に答える 1