0

pyspark.sql.dataframe.DataFrame" " に保存されているトランザクションを、トランザクションのソース (この場合は顧客 ID) を示すddf列 " "でグループ化したいと考えています。key

グループ化は非常にコストのかかるプロセスであるため、ネストされたスキーマでグループをディスクに書き込みたいと考えています。

(key, [[c1, c2, c3,...], ...])

これにより、すべてのトランザクションをキーにすばやくロードし、グループ化を再実行することなく複雑なカスタム アグリゲーターを開発できます。

ネストされたスキーマを作成してディスクに書き込むにはどうすればよいですか?

4

1 に答える 1

0

答えは簡単ですが、これを理解するのにかなりの時間がかかりました。

まず、すべてのトランザクションをkey(顧客 ID)だけ減らします。

from operators import add
# ddf is a dataframe with a transaction in each row.  Key is the column
# we want to group the transactions by.

txnrdd = ddf.rdd.map(lambda row: (row['key'], [row],) ).reduceByKey(add)

これにより、 のrddようなが得られます(key, [list of Rows])。これを に書き戻すにはdataframe、スキーマを構築する必要があります。トランザクション リストは、 でモデル化できますArrayType

from pyspark.sql import types as sqxt
txn_schema = sqxt.StructType([
    sqxt.StructField('Key', sqxt.StringType()),
    sqxt.StructField('Transactions', sqxt.ArrayType(ddf.schema))
])

次に、この構造でデータをディスクに書き込むのは簡単です。

txnddf = txnrdd.toDF(schema=txn_schema)
txnddf.write.parquet('customer-transactions.parquet')

パフォーマンスはOKのようです。RDD を経由せずにこれを行う方法を見つけることができませんでした。

于 2016-05-14T09:39:58.697 に答える