6

ワークフローの残りの部分を続行する前に、特定の条件に達するまでタスクを何度もループしたいと考えています。

私がこれまでに持っているのはこれです:

# Loop task
class MyLoop(Task):
    def run(self):
        loop_res = prefect.context.get("task_loop_result", 1)
        print (loop_res)
        if loop_res >= 10:
            return loop_res
        raise LOOP(result=loop_res+1)

しかし、私が理解している限り、これは複数のタスクでは機能しません。さらに戻って、一度に複数のタスクをループする方法はありますか?

4

2 に答える 2

5

解決策は、1 つ以上のパラメーターを使用して新しいフローを作成し、flow.run() を呼び出す単一のタスクを作成することです。例えば:

class MultipleTaskLoop(Task):
    def run(self):
        # Get previous value
        loop_res = prefect.context.get("task_loop_result", 1)
        
        # Create subflow
        with Flow('Subflow', executor=LocalDaskExecutor()) as flow:
            x = Parameter('x', default = 1)
            loop1 = print_loop()
            add = add_value(x)
            loop2 = print_loop()
            loop1.set_downstream(add)
            add.set_downstream(loop2)

        # Run subflow and extract result
        subflow_res = flow.run(parameters={'x': loop_res})
        new_res = subflow_res.result[add]._result.value

        # Loop
        if new_res >= 10:
            return new_res
        raise LOOP(result=new_res)

ここでは、print_loop単に「ループ」を出力に出力し、add_value受け取った値に 1 を追加します。

于 2021-07-23T11:24:31.253 に答える