完璧なワークフローでは、すべてのスケジュール実行のデータを保持しようとしています。以前と現在のすべての結果のデータを比較する必要があります。Localresult と checkpoint=true を試しましたが、うまくいきません。例えば、
from prefect import Flow, task
from prefect.engine.results import LocalResult
from prefect.schedules import IntervalSchedule
from datetime import timedelta, datetime
import os
import prefect
@task("func_task_target.txt", checkpoint=True, result=LocalResult(dir="~/.prefect"))
def file_scan():
files = os.listdir(test)
#prefect.context.a = files
return files
schedule = IntervalSchedule(interval=timedelta(seconds=61))
with Flow("Test persist data", schedule) as flow:
a = file_scan()
flow.run()
私のフローは 61 秒/分ごとにスケジュールされています。最初の実行では空の結果が得られる可能性がありますが、2 回目のスケジュールされた実行では、以前のフロー結果を取得して比較する必要があります。誰でもこれを達成するのを手伝ってもらえますか? ありがとう!