このようなバッチモードでdasがうまく機能することを理解しています
def load(filename):
...
def clean(data):
...
def analyze(sequence_of_data):
...
def store(result):
with open(..., 'w') as f:
f.write(result)
dsk = {'load-1': (load, 'myfile.a.data'),
'load-2': (load, 'myfile.b.data'),
'load-3': (load, 'myfile.c.data'),
'clean-1': (clean, 'load-1'),
'clean-2': (clean, 'load-2'),
'clean-3': (clean, 'load-3'),
'analyze': (analyze, ['clean-%d' % i for i in [1, 2, 3]]),
'store': (store, 'analyze')}
from dask.multiprocessing import get
get(dsk, 'store') # executes in parallel
- チャンクの数が不明または無限でさえあるストリーミング channel を処理するために dask を使用できますか?
- インクリメンタルな方法で計算を実行できますか。たとえば、上記の「分析」ステップで進行中のチャンクを処理できますか?
- すべてのデータ チャンクが判明した後でのみ「get」操作を呼び出す必要があります。「get」が呼び出された後に新しいチャンクを追加できますか