10 個のワーカーと合計 40 個のコアを持つダッシュ グリッドがあるとします。これは共有グリッドなので、自分の作業で完全に飽和させたくありません。実行するタスクが 1000 個あり、一度に最大 20 個のタスクを送信 (およびアクティブに実行) したいと考えています。
具体的に言うと、
from time import sleep
from random import random
def inc(x):
from random import random
sleep(random() * 2)
return x + 1
def double(x):
from random import random
sleep(random())
return 2 * x
>>> from distributed import Executor
>>> e = Executor('127.0.0.1:8786')
>>> e
<Executor: scheduler=127.0.0.1:8786 workers=10 threads=40>
キューのシステムをセットアップすると
>>> from queue import Queue
>>> input_q = Queue()
>>> remote_q = e.scatter(input_q)
>>> inc_q = e.map(inc, remote_q)
>>> double_q = e.map(double, inc_q)
これは機能しますが、すべてのタスクがグリッドにダンプされ、飽和状態になります。理想的には、次のことができます。
e.scatter(input_q, max_submit=20)
ここのドキュメントの例では、maxsize
キューを使用できるようです。しかし、ユーザーの観点から見ると、私はまだバックプレッシャーに対処しなければならないようです。理想的 dask
には、これを自動的に処理します。