TLDR :ダスクバッグからダスクデータフレームを作成しました。dask データフレームは、すべての観測 (イベント) を列として扱います。したがって、各イベントのデータの行ではなく、各イベントの列があります。目標は、パンダが df.T を使用してデータフレームを転置できるのと同じ方法で、列を行に転置することです。
詳細:私のタイムラインのサンプル Twitter データはこちらにあります。私の出発点に到達するために、これは json をディスクから に読み取り、dask.bag
それを に変換するコードですdask.dataframe
import dask.bag as db
import dask.dataframe as dd
import json
b = db.read_text('./sampleTwitter.json').map(json.loads)
df = b.to_dataframe()
df.head()
問題私の個々のイベント (つまりツイート) はすべて、列と行として記録されます。原則tidy
に従って、イベントごとに行を作成したいと思います。 pandas
にはデータフレームの転置メソッドがあり、dask.array には配列の転置メソッドがあります。私の目標は、同じ転置操作を行うことですが、dask データフレームで行います。どうすればいいですか?
- 行を列に変換する
ソリューションの編集
このコードは元の転置の問題を解決し、保持する列を定義して残りを削除することで Twitter json ファイルをクリーンアップし、関数をシリーズに適用して新しい列を作成します。次に、はるかに小さいクリーンなファイルをディスクに書き込みます。
import dask.dataframe as dd
from dask.delayed import delayed
import dask.bag as db
from dask.diagnostics import ProgressBar,Profiler, ResourceProfiler, CacheProfiler
import pandas as pd
import json
import glob
# pull in all files..
filenames = glob.glob('~/sampleTwitter*.json')
# df = ... # do work with dask.dataframe
dfs = [delayed(pd.read_json)(fn, 'records') for fn in filenames]
df = dd.from_delayed(dfs)
# see all the fields of the dataframe
fields = list(df.columns)
# identify the fields we want to keep
keepers = ['coordinates','id','user','created_at','lang']
# remove the fields i don't want from column list
for f in keepers:
if f in fields:
fields.remove(f)
# drop the fields i don't want and only keep whats necessary
df = df.drop(fields,axis=1)
clean = df.coordinates.apply(lambda x: (x['coordinates'][0],x['coordinates'][1]), meta= ('coords',tuple))
df['coords'] = clean
# making new filenames from old filenames to save cleaned files
import re
newfilenames = []
for l in filenames:
newfilenames.append(re.search('(?<=\/).+?(?=\.)',l).group()+'cleaned.json')
#newfilenames
# custom saver function for dataframes using newfilenames
def saver(frame,filename):
return frame.to_json('./'+filename)
# converting back to a delayed object
dfs = df.to_delayed()
writes = [(delayed((saver)(df, fn))) for df, fn in zip(dfs, newfilenames)]
# writing the cleaned, MUCH smaller objects back to disk
dd.compute(*writes)