12

10 個のワーカーと合計 40 個のコアを持つダッシュ グリッドがあるとします。これは共有グリッドなので、自分の作業で完全に飽和させたくありません。実行するタスクが 1000 個あり、一度に最大 20 個のタスクを送信 (およびアクティブに実行) したいと考えています。

具体的に言うと、

from time import sleep
from random import random

def inc(x):
    from random import random
    sleep(random() * 2)
    return x + 1

def double(x):
    from random import random
    sleep(random())
    return 2 * x

>>> from distributed import Executor
>>> e = Executor('127.0.0.1:8786')
>>> e
<Executor: scheduler=127.0.0.1:8786 workers=10 threads=40>

キューのシステムをセットアップすると

>>> from queue import Queue
>>> input_q = Queue()
>>> remote_q = e.scatter(input_q)
>>> inc_q = e.map(inc, remote_q)
>>> double_q = e.map(double, inc_q)

これは機能しますが、すべてのタスクがグリッドにダンプされ、飽和状態になります。理想的には、次のことができます。

e.scatter(input_q, max_submit=20)

ここのドキュメントの例では、maxsizeキューを使用できるようです。しかし、ユーザーの観点から見ると、私はまだバックプレッシャーに対処しなければならないようです。理想的 daskには、これを自動的に処理します。

4

2 に答える 2

8

使用するmaxsize=

あなたはとても近くにいます。、 、およびのすべてはscatter、と同じキーワード引数を取ります。したがって、単純なワークフローは次のようになります。gathermapmaxsize=Queue

from time import sleep

def inc(x):
    sleep(1)
    return x + 1

your_input_data = list(range(1000))

from queue import Queue              # Put your data into a queue
q = Queue()
for i in your_input_data:
    q.put(i)

from dask.distributed import Executor
e = Executor('127.0.0.1:8786')        # Connect to cluster


futures = e.map(inc, q, maxsize=20)  # Map inc over data
results = e.gather(futures)          # Gather results

L = []
while not q.empty() or not futures.empty() or not results.empty():
    L.append(results.get())  # this blocks waiting for all results

qfutures、およびはすべてresultsPython Queue オブジェクトです。およびキューqresultsは制限がないため、可能な限り貪欲に引き込みます。ただし、futuresキューの最大サイズは 20 であるため、常に 20 個の先物しか実行できません。主要な未来が完了すると、それはすぐに収集関数によって消費され、その結果がresultsキューに入れられます。これにより、スペースが解放されfutures、別のタスクがサブミットされます。

これはまさにあなたが望んでいたものではないことに注意してください。これらのキューは順序付けられているため、先物はキューの先頭にある場合にのみ取り出されます。最初のものを除いてすべての進行中の先物が終了した場合、それらはまだキューに留まり、スペースを占有します。maxsize=この制約を考えると、希望する項目よりも少し多めに選択することをお勧めします20

これを拡張する

ここではmap->gather、間にロジックがない単純なパイプラインを実行します。ここに他のmap計算を入れたり、先物をキューから引き出して、自分でカスタム作業を行うこともできます。上記の型から抜け出すのは簡単です。

于 2016-08-12T15:40:15.760 に答える
1

github に投稿されたソリューションは非常に役に立ちました - https://github.com/dask/distributed/issues/864

解決:

inputs = iter(inputs)
futures = [c.submit(func, next(inputs)) for i in range(maxsize)]
ac = as_completed(futures)

for finished_future in ac:
    # submit new future 
    try:
        new_future = c.submit(func, next(inputs))
        ac.append(new_future)
    except StopIteration:
        pass
    result = finished_future.result() 
    ... # do stuff with result

クエリ:

ただし、タスクを調整できるワーカーを特定するために、client.has_what() API を利用しようとしています。ステータス UI ページに表示されているように、ワーカーの負荷がすぐに反映されないようです。has_what がデータを反映するのにかなりの時間がかかる場合があります。

UIが利用しているものと同様のスロットル範囲を決定するために使用できる無料のワーカーの数を決定するために使用できる別のAPIはありますか?

于 2018-03-27T22:12:48.673 に答える