私は m4.4xlarge (64 GB RAM) EC2 ボックスを持っています。私はパンダでダスクを実行しています。次のメモリ エラーが発生します。
約 24 時間の実行後にこれを取得します。これは、タスクが完了するのにかかるおおよその時間であるため、エラーが RAM の不足によるものかどうか、スクリプトの最後にディスク メモリが原因であるかどうかはわかりません。 .to_csv() を使用して大きな DF をディスクに書き込むか、または pandas/numpy の内部メモリ制限ですか?
raise(remote_exception(res, tb))
dask.async.MemoryError:
Traceback
---------
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/dask/async.py", line 267, in execute_task
result = _execute_task(task, data)
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/dask/async.py", line 248, in _execute_task
args2 = [_execute_task(a, cache) for a in args]
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/dask/async.py", line 249, in _execute_task
return func(*args2)
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/frame.py", line 4061, in apply
return self._apply_standard(f, axis, reduce=reduce)
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/frame.py", line 4179, in _apply_standard
result = result._convert(datetime=True, timedelta=True, copy=False)
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/generic.py", line 3004, in _convert
copy=copy)).__finalize__(self)
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 2941, in convert
return self.apply('convert', **kwargs)
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 2901, in apply
bm._consolidate_inplace()
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 3278, in _consolidate_inplace
self.blocks = tuple(_consolidate(self.blocks))
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 4269, in _consolidate
_can_consolidate=_can_consolidate)
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 4289, in _merge_blocks
new_values = _vstack([b.values for b in blocks], dtype)
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 4335, in _vstack
return np.vstack(to_stack)
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/numpy/core/shape_base.py", line 230, in vstack
return _nx.concatenate([atleast_2d(_m) for _m in tup], 0)
アップデート:
MRocklinの回答に照らして、いくつかの追加情報があります。
プロセスを実行する方法は次のとおりです。
def dask_stats_calc(dfpath,v1,v2,v3...):
dfpath_ddf = dd.from_pandas(dfpath,npartitions=16,sort=False)
return dfpath_ddf.apply(calculate_stats,axis=1,args=(dfdaily,v1,v2,v3...)).compute(get=get).stack().reset_index(drop=True)
f_threaded = partial(dask_stats_calc,dfpath,v1,v2,v3...,multiprocessing.get)
f_threaded()
現在、問題はdfpath
140 万行の df であるため、140 万行dfpath_ddf.apply()
を超えて実行されます。
全体dfpath_ddf.apply()
が完了するとdf.to_csv()
発生しますが、あなたが言ったように、定期的にディスクに書き込む方がよいでしょう。
問題は、たとえば 20 万行ごとに定期的にディスクに書き込むようなものをどのように実装すればよいかということです。dfpath_ddf
200k チャンク (または同様のもの) に分割して、それぞれを順番に実行できると思いますか?