1

完璧なワークフローでは、すべてのスケジュール実行のデータを保持しようとしています。以前と現在のすべての結果のデータを比較する必要があります。Localresult と checkpoint=true を試しましたが、うまくいきません。例えば、

from prefect import Flow, task
from prefect.engine.results import LocalResult
from prefect.schedules import IntervalSchedule
from datetime import timedelta, datetime
import os
import prefect

@task("func_task_target.txt", checkpoint=True, result=LocalResult(dir="~/.prefect"))
def file_scan():
    files = os.listdir(test) 
    #prefect.context.a = files
    return files

schedule = IntervalSchedule(interval=timedelta(seconds=61))

with Flow("Test persist data", schedule) as flow:
    a = file_scan()
flow.run()

私のフローは 61 秒/分ごとにスケジュールされています。最初の実行では空の結果が得られる可能性がありますが、2 回目のスケジュールされた実行では、以前のフロー結果を取得して比較する必要があります。誰でもこれを達成するのを手伝ってもらえますか? ありがとう!

4

1 に答える 1