2

分散ジョブで Dask を実行しているときに、スケジューラで次のエラーが発生しました。

distributed.core - ERROR -
Traceback (most recent call last):
  File "/usr/local/lib/python3.4/dist-packages/distributed/core.py", line 269, in write
    frames = protocol.dumps(msg)
  File "/usr/local/lib/python3.4/dist-packages/distributed/protocol.py", line 81, in dumps
    frames = dumps_msgpack(small)
  File "/usr/local/lib/python3.4/dist-packages/distributed/protocol.py", line 153, in dumps_msgpack
    payload = msgpack.dumps(msg, use_bin_type=True)
  File "/usr/local/lib/python3.4/dist-packages/msgpack/__init__.py", line 47, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 231, in msgpack._packer.Packer.pack (msgpack/_packer.cpp:231)
  File "msgpack/_packer.pyx", line 239, in msgpack._packer.Packer.pack (msgpack/_packer.cpp:239)
MemoryError

これは、スケジューラまたはいずれかのワーカーでメモリ不足ですか? または両方??

4

1 に答える 1

2

このエラーの最も一般的な原因は、dask.dataframe を使用した次の例のように、収集しようとするデータが多すぎることです。

df = dd.read_csv('s3://bucket/lots-of-data-*.csv')
df.compute()

これにより、すべてのデータがクラスター全体の RAM にロードされ (これは問題ありません)、スケジューラーを介して結果全体をローカル マシンに戻そうとします (おそらく、数百 GB のデータをすべて処理することはできません)。ワーカーからクライアントへの通信はスケジューラを通過するため、スケジューラはすべてのデータを受信する最初の単一のマシンであり、障害が発生する可能性が最も高いマシンです。

このような場合は、代わりにExecutor.persistメソッドを使用して計算をトリガーし、それをクラスターに残すことをお勧めします。

df = dd.read_csv('s3://bucket/lots-of-data-*.csv')
df = e.persist(df)

通常df.compute()、ローカル セッションで表示したい小さな結果にのみ使用します。

于 2016-07-23T13:43:39.450 に答える