答えは簡単ですが、これを理解するのにかなりの時間がかかりました。
まず、すべてのトランザクションをkey
(顧客 ID)だけ減らします。
from operators import add
# ddf is a dataframe with a transaction in each row. Key is the column
# we want to group the transactions by.
txnrdd = ddf.rdd.map(lambda row: (row['key'], [row],) ).reduceByKey(add)
これにより、 のrdd
ようなが得られます(key, [list of Rows])
。これを に書き戻すにはdataframe
、スキーマを構築する必要があります。トランザクション リストは、 でモデル化できますArrayType
。
from pyspark.sql import types as sqxt
txn_schema = sqxt.StructType([
sqxt.StructField('Key', sqxt.StringType()),
sqxt.StructField('Transactions', sqxt.ArrayType(ddf.schema))
])
次に、この構造でデータをディスクに書き込むのは簡単です。
txnddf = txnrdd.toDF(schema=txn_schema)
txnddf.write.parquet('customer-transactions.parquet')
パフォーマンスはOKのようです。RDD を経由せずにこれを行う方法を見つけることができませんでした。