クラスターをセットアップし、基本的なフローをDask
喜んで送信しています。Prefect
今、私はもっと面白いことをしたいと思っており、Python ライブラリを含むカスタム Docker イメージを取得し、dask クラスターでフロー/タスクを実行します。
私の仮定は、dask クラスター (スケジューラーとワーカー) を独自の Python 環境のままにしておくことができるということでした (すべてのさまざまなメッセージ パッシング ライブラリに一致するバージョンがどこにでもあることを確認した後)。つまり、フローがカスタム内で実行される場合、ライブラリをそれらのマシンに追加する必要はないと思いますstorage
。ただし、ストレージを正しく設定していないか、上記を想定するのは安全ではありません。つまり、おそらくカスタム ライブラリでオブジェクトをピクルする場合、Dask クラスターはPython ライブラリについて知る必要があります。...という名前の汎用のpythonライブラリがあるとしdata
ます...
import prefect
from prefect.engine.executors import DaskExecutor
#see https://docs.prefect.io/api/latest/environments/storage.html#docker
from prefect.environments.storage import Docker
#option 1
storage = Docker(registry_url="gcr.io/my-project/",
python_dependencies=["some-extra-public-package"],
dockerfile="/path/to/Dockerfile")
#this is the docker build and register workflow!
#storage.build()
#or option 2, specify image directly
storage = Docker(
registry_url="gcr.io/my-project/", image_name="my-image", image_tag="latest"
)
#storage.build()
def get_tasks():
return [
"gs://path/to/task.yaml"
]
@prefect.task
def run_task(uri):
#fails because this data needs to be pickled ??
from data.tasks import TaskBase
task = TaskBase.from_task_uri(uri)
#task.run()
return "done"
with prefect.Flow("dask-example",
storage = storage) as flow:
#chain stuff...
result = run_task.map(uri=get_tasks())
executor = DaskExecutor(address="tcp://127.0.01:8080")
flow.run(executor=executor)
このタイプの Docker ベースのワークフローがどのように機能するか、または機能するかどうかを説明できる人はいますか?