2

クラスターをセットアップし、基本的なフローをDask喜んで送信しています。Prefect今、私はもっと面白いことをしたいと思っており、Python ライブラリを含むカスタム Docker イメージを取得し、dask クラスターでフロー/タスクを実行します。

私の仮定は、dask クラスター (スケジューラーとワーカー) を独自の Python 環境のままにしておくことができるということでした (すべてのさまざまなメッセージ パッシング ライブラリに一致するバージョンがどこにでもあることを確認した後)。つまり、フローがカスタム内で実行される場合、ライブラリをそれらのマシンに追加する必要はないと思いますstorage。ただし、ストレージを正しく設定していないか、上記を想定するのは安全ではありません。つまり、おそらくカスタム ライブラリでオブジェクトをピクルする場合、Dask クラスターPython ライブラリについて知る必要があります。...という名前の汎用のpythonライブラリがあるとしdataます...

import prefect    
from prefect.engine.executors import DaskExecutor
#see https://docs.prefect.io/api/latest/environments/storage.html#docker
from prefect.environments.storage import Docker

#option 1
storage = Docker(registry_url="gcr.io/my-project/",
                 python_dependencies=["some-extra-public-package"],
                 dockerfile="/path/to/Dockerfile")
#this is the docker build and register workflow!
#storage.build()

#or option 2, specify image directly
storage = Docker(
        registry_url="gcr.io/my-project/", image_name="my-image", image_tag="latest"
    )

#storage.build()

def get_tasks():
    return [
        "gs://path/to/task.yaml"
           ]

@prefect.task
def run_task(uri):
    #fails because this data needs to be pickled ??
    from data.tasks import TaskBase
    task =  TaskBase.from_task_uri(uri)
    #task.run()
    return "done"

with prefect.Flow("dask-example",
                 storage = storage) as flow:
    #chain stuff...
    result =  run_task.map(uri=get_tasks())

executor = DaskExecutor(address="tcp://127.0.01:8080")
flow.run(executor=executor)

このタイプの Docker ベースのワークフローがどのように機能するか、または機能するかどうかを説明できる人はいますか?

4

1 に答える 1