次のことを行う python スクリプトがあります。データの入力ファイル (通常はネストされた JSON 形式) を受け取ります。iii.データを所望のフォーマットに操作する別の関数に1行ずつデータを渡す。最後に、出力をファイルに書き込みます。
これがこれを行う私の現在の単純なpython行です...
def manipulate(line):
# a pure python function which transforms the data
# ...
return manipulated_json
for line in f:
components.append(manipulate(ujson.loads(line)))
write_to_csv(components)`
これは機能しますが、python GIL がサーバー上の 1 つのコアに制限しているため、特に大量のデータでは、非常に遅くなります。
私が通常扱うデータの量は、gzip で圧縮された約 4 ギガですが、時には数百ギガの gzip で圧縮されたデータを処理する必要があります。必ずしもビッグデータではありませんが、すべてをメモリ内で処理することはできず、Python の GIL では処理が非常に遅くなります。
データ処理を最適化するソリューションを探しているときに、das に出会いました。当時の私には PySpark が当然の解決策のように思えましたが、dask の約束とそのシンプルさに心を奪われ、試してみることにしました。
dask とその使用方法について多くの調査を行った後、現在のプロセスを再現するための非常に小さなスクリプトを作成しました。スクリプトは次のようになります。
import dask.bag as bag
import json
bag.from_filenames('input.json.gz').map(json.loads).map(lambda x:manipulate(x)).concat().to_dataframe().to_csv('output.csv.gz')`
これは機能し、元の非 Dask スクリプトと同じ結果を生成しますが、サーバー上で1 つの CPU しか使用しません。だから、それはまったく役に立ちませんでした。実際、それはより遅いです。
私は何を間違っていますか?何か不足していますか?私はまだdaskにかなり慣れていないので、何かを見落としているか、まったく別のことをすべきかどうかを教えてください.
また、必要なことにサーバーの全容量 (つまり、すべての CPU) を使用するための dask の代替手段はありますか?
ありがとう、
T