4

TLDR :ダスクバッグからダスクデータフレームを作成しました。dask データフレームは、すべての観測 (イベント) を列として扱います。したがって、各イベントのデータの行ではなく、各イベントの列があります。目標は、パンダが df.T を使用してデータフレームを転置できるのと同じ方法で、列を行に転置することです。

詳細:私のタイムラインのサンプル Twitter データはこちらにあります。私の出発点に到達するために、これは json をディスクから に読み取り、dask.bagそれを に変換するコードですdask.dataframe

import dask.bag as db
import dask.dataframe as dd
import json


b = db.read_text('./sampleTwitter.json').map(json.loads)
df = b.to_dataframe()
df.head()

問題私の個々のイベント (つまりツイート) はすべて、列と行として記録されます。原則tidyに従って、イベントごとに行を作成したいと思います。 pandasにはデータフレームの転置メソッドがあり、dask.array には配列の転置メソッドがあります。私の目標は、同じ転置操作を行うことですが、dask データフレームで行います。どうすればいいですか?

  1. 行を列に変換する

ソリューションの編集

このコードは元の転置の問題を解決し、保持する列を定義して残りを削除することで Twitter json ファイルをクリーンアップし、関数をシリーズに適用して新しい列を作成します。次に、はるかに小さいクリーンなファイルをディスクに書き込みます。

import dask.dataframe as dd
from dask.delayed import delayed
import dask.bag as db
from dask.diagnostics import ProgressBar,Profiler, ResourceProfiler, CacheProfiler
import pandas as pd
import json
import glob

# pull in all files..
filenames = glob.glob('~/sampleTwitter*.json')


# df = ... # do work with dask.dataframe
dfs = [delayed(pd.read_json)(fn, 'records') for fn in filenames]
df = dd.from_delayed(dfs)


# see all the fields of the dataframe 
fields = list(df.columns)

# identify the fields we want to keep
keepers = ['coordinates','id','user','created_at','lang']

# remove the fields i don't want from column list
for f in keepers:
    if f in fields:
        fields.remove(f)

# drop the fields i don't want and only keep whats necessary
df = df.drop(fields,axis=1)

clean = df.coordinates.apply(lambda x: (x['coordinates'][0],x['coordinates'][1]), meta= ('coords',tuple))
df['coords'] = clean

# making new filenames from old filenames to save cleaned files
import re
newfilenames = []
for l in filenames:
    newfilenames.append(re.search('(?<=\/).+?(?=\.)',l).group()+'cleaned.json')
#newfilenames

# custom saver function for dataframes using newfilenames
def saver(frame,filename):
    return frame.to_json('./'+filename)

# converting back to a delayed object
dfs = df.to_delayed()
writes = [(delayed((saver)(df, fn))) for df, fn in zip(dfs, newfilenames)]

# writing the cleaned, MUCH smaller objects back to disk
dd.compute(*writes)
4

1 に答える 1

1

次のようなコードを使用して、バッグを完全にバイパスすることで、必要な結果を得ることができると思います

import glob

import pandas as pd
import dask.dataframe as dd
from dask.delayed import delayed

filenames = glob.glob('sampleTwitter*.json')
dfs = [delayed(pd.read_json)(fn, 'records') for fn in filenames]
ddf = dd.from_delayed(dfs)
于 2016-08-06T01:41:54.600 に答える