2

Daskを使用した分散データ フレームに関するMatthew Rocklin の投稿に基づいて、いくつかの要約統計計算をクラスター全体に分散しようとしています。でクラスターをセットアップすると正常にdcluster ...動作します。手帳の中に、

import dask.dataframe as dd
from distributed import Executor, progress
e = Executor('...:8786')

df = dd.read_csv(...)

私が読んでいるファイルは、すべてのワーカー マシンがアクセスできる NFS マウント上にあります。この時点でdf.head()、例を見ると、すべてが正しく見えます。ブログ投稿から、私はこれを行うことができるはずだと思います:

df_future = e.persist(df)
progress(df_future)
# ... wait for everything to load ...
df_future.head()

しかし、それはエラーです:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-26-8d59adace8bf> in <module>()
----> 1 fraudf.head()

/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/dataframe/core.py in head(self, n, compute)
    358 
    359         if compute:
--> 360             result = result.compute()
    361         return result
    362 

/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/base.py in compute(self, **kwargs)
     35 
     36     def compute(self, **kwargs):
---> 37         return compute(self, **kwargs)[0]
     38 
     39     @classmethod

/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/base.py in compute(*args, **kwargs)
    108                 for opt, val in groups.items()])
    109     keys = [var._keys() for var in variables]
--> 110     results = get(dsk, keys, **kwargs)
    111 
    112     results_iter = iter(results)

/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, **kwargs)
     55     results = get_async(pool.apply_async, len(pool._pool), dsk, result,
     56                         cache=cache, queue=queue, get_id=_thread_get_id,
---> 57                         **kwargs)
     58 
     59     return results

/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/async.py in get_async(apply_async, num_workers, dsk, result, cache, queue, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, **kwargs)
    479                 _execute_task(task, data)  # Re-execute locally
    480             else:
--> 481                 raise(remote_exception(res, tb))
    482         state['cache'][key] = res
    483         finish_task(dsk, key, state, results, keyorder.get)

AttributeError: 'Future' object has no attribute 'head'

Traceback
---------
  File "/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/async.py", line 264, in execute_task
    result = _execute_task(task, data)
  File "/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/async.py", line 246, in _execute_task
    return func(*args2)
  File "/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/dataframe/core.py", line 354, in <lambda>
    dsk = {(name, 0): (lambda x, n: x.head(n=n), (self._name, 0), n)}

HDFS ではなく通常のファイル システムからデータ フレームを配布する正しい方法は何ですか?

4

1 に答える 1

1

Dask は、通常の dask ライブラリを使用してデータフレームを作成する場合のデフォルトである、単一マシンのスケジューラを使用しようとしています。次の行で、クラスターを使用するようにデフォルトを切り替えます。

import dask
dask.set_options(get=e.get)
于 2016-04-14T18:57:18.973 に答える