1

非常に簡単に..これはバグですか、それとも何か不足していますか? tmp_j単品で6仕切りのバッグです。ただし、大きなバッグでも同様の反応が得られます。

この特定のバッグは次のもので構成されています。

>>> tmp_j = jnode_b.filter(lambda r: (r['node']['attrib']['uid'] == '8909') & 
               (r['node']['attrib']['version'] == '1')).pluck('node').pluck('attire')

次のようになります。

>>> tmp_j.compute()

[{'changeset': '39455176',
  'id': '4197394169',
  'lat': '53.4803608',
  'lon': '-113.4955328',
  'timestamp': '2016-05-20T16:43:02Z',
  'uid': '8909',
  'user': 'mvexel',
  'version': '1'}]

再度、感謝します..

>>> tmp_j.repartition(1).map(json.dumps).to_textfiles('tmpA*.json')

正しく動作します(ファイルを書き込みます)が、

>>> tmp_j.map(json.dumps).to_textfiles('tmpA*.json')

与える

StopIteration                             Traceback (most recent call last)
<ipython-input-28-a77a33e2ff26> in <module>()
----> 1 tmp_j.map(json.dumps).to_textfiles('tmp*.json')

/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/bag/core.py in to_textfiles(self, path, name_function, compression, encoding, compute)
    469     def to_textfiles(self, path, name_function=str, compression='infer',
    470                      encoding=system_encoding, compute=True):
--> 471         return to_textfiles(self, path, name_function, compression, encoding, compute)
    472 
    473     def fold(self, binop, combine=None, initial=no_default, split_every=None):

/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/bag/core.py in to_textfiles(b, path, name_function, compression, encoding, compute)
    167     result = Bag(merge(b.dask, dsk), name, b.npartitions)
    168     if compute:
--> 169         result.compute()
    170     else:
    171         return result

/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/base.py in compute(self, **kwargs)
     35 
     36     def compute(self, **kwargs):
---> 37         return compute(self, **kwargs)[0]
     38 
     39     @classmethod

/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/base.py in compute(*args, **kwargs)
    108                 for opt, val in groups.items()])
    109     keys = [var._keys() for var in variables]
--> 110     results = get(dsk, keys, **kwargs)
    111 
    112     results_iter = iter(results)

/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/multiprocessing.py in get(dsk, keys, optimizations, num_workers, func_loads, func_dumps, **kwargs)
     76         # Run
     77         result = get_async(apply_async, len(pool._pool), dsk3, keys,
---> 78                            queue=queue, get_id=_process_get_id, **kwargs)
     79     finally:
     80         if cleanup:

/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in get_async(apply_async, num_workers, dsk, result, cache, queue, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, **kwargs)
    486                 _execute_task(task, data)  # Re-execute locally
    487             else:
--> 488                 raise(remote_exception(res, tb))
    489         state['cache'][key] = res
    490         finish_task(dsk, key, state, results, keyorder.get)

StopIteration: 

Traceback
---------
  File "/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py", line 267, in execute_task
    result = _execute_task(task, data)
  File "/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py", line 249, in _execute_task
    return func(*args2)
  File "/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/bag/core.py", line 1024, in write
    firstline = next(data)

注:それは

>>> tmp_b = db.from_sequence(tmp_j,partition_size=3)
>>> tmp_b.map(json.dumps).to_textfiles('tmp*.json')

正常に動作します(ただし、繰り返しますtmp_b.npartitions == 1)。

洞察に感謝します-ソースを確認しましたが、スマート/怠惰な比率が低すぎることに気付きました。

これを把握したと確信したら、ドキュメントを提出します。

4

1 に答える 1