AWS Glue と Apache Hudi を使用して、RDS のデータを S3 にレプリケートします。以下のジョブを実行すると、2 つの parquet ファイル (初期のものと更新されたもの) が S3 バケット (basePath) に生成されます。この場合、最新のファイルが 1 つだけ必要で、古いファイルを削除したいと考えています。
バケットに最新のファイルを 1 つ保持する方法を知っている人はいますか?
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(5))
df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.show()
tableName = 'hudi_mor_athena_sample'
bucketName = 'cm-sato-hudi-sample--datalake'
basePath = f's3://{bucketName}/{tableName}'
hudi_options = {
'hoodie.table.name': tableName,
'hoodie.datasource.write.storage.type': 'MERGE_ON_READ',
'hoodie.compact.inline': 'false',
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
'hoodie.datasource.write.table.name': tableName,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2,
}
df.write.format("hudi"). \
options(**hudi_options). \
mode("overwrite"). \
save(basePath)
updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(3))
df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.show()
# update
df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(basePath)
job.commit()