4

このようなバッチモードでdasがうまく機能することを理解しています

def load(filename):
    ...

def clean(data):
    ...

def analyze(sequence_of_data):
    ...

def store(result):
    with open(..., 'w') as f:
        f.write(result)

dsk = {'load-1': (load, 'myfile.a.data'),
       'load-2': (load, 'myfile.b.data'),
       'load-3': (load, 'myfile.c.data'),
       'clean-1': (clean, 'load-1'),
       'clean-2': (clean, 'load-2'),
       'clean-3': (clean, 'load-3'),
       'analyze': (analyze, ['clean-%d' % i for i in [1, 2, 3]]),
       'store': (store, 'analyze')}

from dask.multiprocessing import get
get(dsk, 'store')  # executes in parallel
  1. チャンクの数が不明または無限でさえあるストリーミング channel を処理するために dask を使用できますか?
  2. インクリメンタルな方法で計算を実行できますか。たとえば、上記の「分析」ステップで進行中のチャンクを処理できますか?
  3. すべてのデータ チャンクが判明した後でのみ「get」操作を呼び出す必要があります。「get」が呼び出された後に新しいチャンクを追加できますか
4

1 に答える 1

3

編集:以下の新しい回答を参照してください

いいえ

dask 内の現在のタスク スケジューラは、単一の計算グラフを想定しています。このグラフへの動的な追加または削除はサポートされていません。スケジューラは、少量のメモリで大きなグラフを評価するように設計されています。これには、事前にグラフ全体を把握しておくことが重要です。

ただし、これは、異なるプロパティを持つ他のスケジューラを作成することを止めるものではありません。conncurrent.futuresここでの簡単な解決策の 1 つは、単一のマシンまたはdistributed複数のマシンでモジュールを使用することです。

実はそうです

分散スケジューラは完全に非同期で動作するようになり、タスクの送信、いくつかのタスクの待機、追加のタスクの送信、タスクのキャンセル、ワーカーの追加/削除など、すべて計算中に実行できます。これを行うにはいくつかの方法がありますが、おそらく最も簡単なのは、concurrent.futuresここで簡単に説明されている新しいインターフェイスです。

http://dask.pydata.org/en/latest/futures.html

于 2015-11-27T15:37:32.980 に答える