0

私はprefectで作業を開始し、結果をGoogleクラウドストレージに保存しようとしています:

import prefect
from prefect.engine.results import GCSResult
from prefect.run_configs import DockerRun, LocalRun
from prefect.storage import Docker, Local

@prefect.task(checkpoint=True, result=GCSResult(bucket="redacted"))
def task1():
    return 1


storage = Local(...)
run_config = LocalRun()

with prefect.Flow(
    "myflow", 
    storage=storage, 
    run_config=run_config
) as flow:
    results = task1()

flow.run()

GOOGLE_APPLICATION_CREDENTIALS 環境変数をキーに設定していれば、すべて正常に動作します。

ただし、フローをドッキングしようとすると、いくつかの問題が発生します。

storage = Docker(...)
run_config = DockerRun(dockerfile="DockerFile")

with prefect.Flow(
    "myflow", 
    storage=storage, 
    run_config=run_config
) as flow:
    ... # Same definition as previously

flow.register()

このような場合、docker エージェントを使用してフローを実行しようとすると (フローが登録されたのと同じマシン上か別のマシン上にあるかに関係なく)、次のエラーが発生します。

google.auth.exceptions.DefaultCredentialsError: Could not automatically determine credentials.
Please set GOOGLE_APPLICATION_CREDENTIALS or explicitly create credentials and re-run the application. 
For more information, please see https://cloud.google.com/docs/authentication/getting-started

ドキュメントに従ってGCP_CREDENTIALS、Prefect クラウドにシークレットを設定しようとしましたが、同じエラーが引き続き発生します。

結果を別のタスクに保存しようとしましたGCSUploadが、まだ同じエラーが発生しています。

私が見る 1 つの解決策は、DockerFile を介して Docker イメージ内に資格情報をパッケージ化することですが、これは、Prefect シークレットを使用する必要があるユース ケースであるべきだと思います。

4

1 に答える 1

1

PrefectSecrettaskを使用して資格情報を取得する何かを考え出しました。

結果をGCSに直接保存する追加のGCSUploadタスクを作成する必要がありました。task1

私の最終的なコードは次のようになります。


import prefect
from prefect.tasks.gcp.storage import GCSUpload
from prefect.tasks.secrets import PrefectSecret
from prefect.run_configs import DockerRun
from prefect.storage import Docker

retrieve_gcp_credentials = PrefectSecret("GCP_CREDENTIALS")


@prefect.task(checkpoint=True, result=GCSResult(bucket="redacted"))
def task1():
    return "1"

save_results_to_gcp = GCSUpload(bucket="redacted")

storage = Docker()
run_config = DockerRun()

with prefect.Flow(
    "myflow", 
    storage=storage, 
    run_config=run_config
) as flow:
    credentials = retrieve_gcp_credentials()
    results = task1()
    save_results_to_gcp(results, credentials=credentials)

flow.run()

task1(タスクは文字列またはバイトしかアップロードできないため、によって返される値のタイプも変更する必要があったことに注意してください)

これは私のユースケースには十分です(GCSで結果を保持するだけです)が、GCSResultキャッシングにも役立つため、使用方法を誰かが知っている場合は、質問を開いたままにします。

于 2021-01-19T15:54:08.430 に答える