1

私は m4.4xlarge (64 GB RAM) EC2 ボックスを持っています。私はパンダでダスクを実行しています。次のメモリ エラーが発生します。

約 24 時間の実行後にこれを取得します。これは、タスクが完了するのにかかるおおよその時間であるため、エラーが RAM の不足によるものかどうか、スクリプトの最後にディスク メモリが原因であるかどうかはわかりません。 .to_csv() を使用して大きな DF をディスクに書き込むか、または pandas/numpy の内部メモリ制限ですか?

raise(remote_exception(res, tb))
    dask.async.MemoryError: 

Traceback
---------
  File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/dask/async.py", line 267, in execute_task
    result = _execute_task(task, data)
  File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/dask/async.py", line 248, in _execute_task
    args2 = [_execute_task(a, cache) for a in args]
  File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/dask/async.py", line 249, in _execute_task
    return func(*args2)
  File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/frame.py", line 4061, in apply
    return self._apply_standard(f, axis, reduce=reduce)
  File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/frame.py", line 4179, in _apply_standard
    result = result._convert(datetime=True, timedelta=True, copy=False)
  File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/generic.py", line 3004, in _convert
    copy=copy)).__finalize__(self)
  File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 2941, in convert
    return self.apply('convert', **kwargs)
  File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 2901, in apply
    bm._consolidate_inplace()
  File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 3278, in _consolidate_inplace
    self.blocks = tuple(_consolidate(self.blocks))
  File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 4269, in _consolidate
    _can_consolidate=_can_consolidate)
  File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 4289, in _merge_blocks
    new_values = _vstack([b.values for b in blocks], dtype)
  File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 4335, in _vstack
    return np.vstack(to_stack)
  File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/numpy/core/shape_base.py", line 230, in vstack
    return _nx.concatenate([atleast_2d(_m) for _m in tup], 0)

アップデート:

MRocklinの回答に照らして、いくつかの追加情報があります。

プロセスを実行する方法は次のとおりです。

def dask_stats_calc(dfpath,v1,v2,v3...):
    dfpath_ddf = dd.from_pandas(dfpath,npartitions=16,sort=False)
    return dfpath_ddf.apply(calculate_stats,axis=1,args=(dfdaily,v1,v2,v3...)).compute(get=get).stack().reset_index(drop=True)

f_threaded = partial(dask_stats_calc,dfpath,v1,v2,v3...,multiprocessing.get)
f_threaded()

現在、問題はdfpath140 万行の df であるため、140 万行dfpath_ddf.apply()を超えて実行されます。

全体dfpath_ddf.apply()が完了するとdf.to_csv()発生しますが、あなたが言ったように、定期的にディスクに書き込む方がよいでしょう。

問題は、たとえば 20 万行ごとに定期的にディスクに書き込むようなものをどのように実装すればよいかということです。dfpath_ddf200k チャンク (または同様のもの) に分割して、それぞれを順番に実行できると思いますか?

4

1 に答える 1

1

シングルスレッド実行

ディスク上の単一のファイルへの書き込みを待機している間に、タスクが RAM に蓄積されることがあります。このような順次出力の使用は、並列システムでは本質的に注意が必要です。単一のファイルを使用する必要がある場合は、同じ計算をシングル スレッドで試して、違いがあるかどうかを確認することをお勧めします。

with dask.set_options(get=dask.async.get_sync):
    DF.to_csv('out.csv')

複数のファイルに書き込む

代わりに (そしてはるかに好ましい)、多くのCSV ファイルに書き出すこともできます。タスクは、ディスクに書き込み、RAM から削除するために先行タスクが完了するまで待機する必要がないため、スケジューリングがはるかに簡単になります。

DF.to_csv('out.*.csv')

したがって、実行と書き込みを並行して実行するための一般的でかなり堅牢な方法は、計算とto_csv最後に への呼び出しを組み合わせることです。

ddf = dd.from_pandas(df, npartitions=100)
ddf.apply(myfunc).to_csv('out.*.csv')

これにより、データフレームがチャンクに分割され、各チャンクで関数が呼び出され、そのチャンクがディスクに書き込まれ、中間値が削除されてスペースが解放されます。

于 2016-07-18T20:37:53.600 に答える