6

次のことを行う python スクリプトがあります。データの入力ファイル (通常はネストされた JSON 形式) を受け取ります。iii.データを所望のフォーマットに操作する別の関数に1行ずつデータを渡す。最後に、出力をファイルに書き込みます。

これがこれを行う私の現在の単純なpython行です...

def manipulate(line):
    # a pure python function which transforms the data
    # ...
    return manipulated_json

for line in f:
    components.append(manipulate(ujson.loads(line)))
    write_to_csv(components)`

これは機能しますが、python GIL がサーバー上の 1 つのコアに制限しているため、特に大量のデータでは、非常に遅くなります。

私が通常扱うデータの量は、gzip で圧縮された約 4 ギガですが、時には数百ギガの gzip で圧縮されたデータを処理する必要があります。必ずしもビッグデータではありませんが、すべてをメモリ内で処理することはできず、Python の GIL では処理が非常に遅くなります。

データ処理を最適化するソリューションを探しているときに、das に出会いました。当時の私には PySpark が当然の解決策のように思えましたが、dask の約束とそのシンプルさに心を奪われ、試してみることにしました。

dask とその使用方法について多くの調査を行った後、現在のプロセスを再現するための非常に小さなスクリプトを作成しました。スクリプトは次のようになります。

import dask.bag as bag
import json
bag.from_filenames('input.json.gz').map(json.loads).map(lambda x:manipulate(x)).concat().to_dataframe().to_csv('output.csv.gz')`

これは機能し、元の非 Dask スクリプトと同じ結果を生成しますが、サーバー上で1 つの CPU しか使用しません。だから、それはまったく役に立ちませんでした。実際、それはより遅いです。

私は何を間違っていますか?何か不足していますか?私はまだdaskにかなり慣れていないので、何かを見落としているか、まったく別のことをすべきかどうかを教えてください.

また、必要なことにサーバーの全容量 (つまり、すべての CPU) を使用するための dask の代替手段はありますか?

ありがとう、

T

4

3 に答える 3

0

glob ベースのファイル名 (たとえばMyFiles-*.csv、dask)dataframe.to_csv()を指定すると、データフレームをディスクに出力できるはずです。1 つの大きな csv ファイルではなく、複数のファイルが作成されます。詳細については、このスレッドを参照して ください https://groups.google.com/a/continuum.io/forum/#!searchin/blaze-dev/to_csv/blaze-dev/NCQfCoOWEcI/S7fwuCfeCgAJ

MyFiles-0001.csv  
MyFiles-0002.csv 
....
于 2016-08-26T18:36:11.830 に答える
0

バッグは、仕切りの数だけ平行になるようです。

私にとって、走っている

mybag=bag.from_filenames(filename, chunkbytes=1e7)
mybag.npartitions

収量

1746年

これにより問題が解決され、処理が完全に並列化可能になりました。

于 2016-01-05T12:25:31.940 に答える