2

大量のログ ファイルを処理しており、ジョブを Spark に移行したいのですが、Pandas で簡単にできるように、イベント ベースの時間枠でイベントを集計する方法がわかりません。

これがまさに私がやりたいことです:

何らかのイベントを経験したユーザーのログ ファイル (以下でシミュレート) について、7 日間さかのぼり、他のすべての列の集計を返したいと思います。

パンダの中はこちら。これを PySpark に移植する方法はありますか?

import pandas as pd
df = pd.DataFrame({'user_id':[1,1,1,2,2,2], 'event':[0,1,0,0,0,1], 'other':[12, 20, 16, 84, 11, 15] , 'event_date':['2015-01-01 00:02:43', '2015-01-04 00:02:03', '2015-01-10 00:12:26', '2015-01-01 00:02:43', '2015-01-06 00:02:43', '2015-01-012 18:10:09']})
df['event_date'] = pd.to_datetime(df['event_date'])
df

与えます:

    event  event_date           other  user_id
0   0      2015-01-01 00:02:43  12     1
1   1      2015-01-04 00:02:03  20     1
2   0      2015-01-10 00:12:26  16     1
3   0      2015-01-01 00:02:43  84     2
4   0      2015-01-06 00:02:43  11     2
5   1      2015-01-12 18:10:09  15     2

この DataFrame を user_id でグループ化し、「イベント」から 7 日より古い行を集計から除外したいと思います。

パンダでは、次のようになります。

def f(x):
    # Find event
    win = x.event == 1

    # Get the date when event === 1
    event_date = list(x[win]['event_date'])[0]

    # Construct the window
    min_date = event_date - pd.DateOffset(days=7) 

    # Set x to this specific date window
    x = x[(x.event_date > min_date) & (x.event_date <= event_date)]

    # Aggregate other
    x['other'] = x.other.sum()

    return x[win] #, x[z]])


df.groupby(by='user_id').apply(f).reset_index(drop=True)

目的の出力を提供します (ユーザーごとに 1 行、ここで event_date は event==1 に対応します):

    event   event_date          other   user_id
0   1       2015-01-04 00:02:03 32      1
1   1       2015-01-12 18:10:09 26      2

Spark でこの結果を取得するには、どこから始めればよいか知っている人はいますか?

4

1 に答える 1