3

私はpysparkが初めてです

次のようなデータセットがあります(いくつかの列のスナップショットのみ)

データ記述

データをキーでグループ化したい。私の鍵は

CONCAT(a.div_nbr,a.cust_nbr)

私の最終的な目標は、データをこのようにフォーマットされた JSON に変換することです

k1[{v1,v2,....},{v1,v2,....}], k2[{v1,v2,....},{v1,v2,....}],....

例えば

248138339 [{ PRECIMA_ID:SCP 00248 0000138339, PROD_NBR:5553505, PROD_DESC:Shot and a Beer Battered Onion Rings (5553505 and 9285840) , PROD_BRND:Molly's Kitchen,PACK_SIZE:4/2.5 LB, QTY_UOM:CA } , 
        { PRECIMA_ID:SCP 00248 0000138339 , PROD_NBR:6659079 , PROD_DESC:Beef Chuck Short Rib Slices, PROD_BRND:Stockyards , PACK_SIZE:12 LBA , QTY_UOM:CA} ,{...,...,} ],

1384611034793[{},{},{}],....

データフレームを作成しました(基本的に2つのテーブルを結合して、さらにフィールドを取得しています)

joinstmt = sqlContext.sql(
          "SELECT a.precima_id , CONCAT(a.div_nbr,a.cust_nbr) as
                  key,a.prod_nbr , a.prod_desc,a.prod_brnd ,      a.pack_size , a.qty_uom , a.sales_opp , a.prc_guidance , a.pim_mrch_ctgry_desc , a.pim_mrch_ctgry_id , b.start_date,b.end_date 

FROM scoop_dtl a join scoop_hdr b on (a.precima_id =b.precima_id)")

さて、上記の結果を得るには、キーに基づいて結果でグループ化する必要があります。次のことを行いました

groupbydf = joinstmt.groupBy("key")

これにより、グループ化されたデータがintpになり、読んだ後、それを直接使用できないことがわかり、データフレームに変換して保存する必要があります。

私はそれに慣れていないので、データフレームに戻すために助けが必要です。または、他の方法もあれば幸いです。

4

2 に答える 2

5

結合されたデータフレームが次のようになっている場合:

gender  age
M   5
F   50
M   10
M   10
F   10

次に、以下のコードを使用して、目的の出力を取得できます

joinedDF.groupBy("gender") \ 
    .agg(collect_list("age").alias("ages")) \
    .write.json("jsonOutput.txt")

出力は次のようになります。

{"gender":"F","ages":[50,10]}
{"gender":"M","ages":[5,10,10]}

名前、給与などの複数の列がある場合。以下のように列を追加できます。

df.groupBy("gender")
    .agg(collect_list("age").alias("ages"),collect_list("name").alias("names"))

出力は次のようになります。

{"gender":"F","ages":[50,10],"names":["ankit","abhay"]}
{"gender":"M","ages":[5,10,10],"names":["snchit","mohit","rohit"]}
于 2018-01-12T09:53:47.303 に答える