0

ワークフローの実行時に実行時にタスクの動的グラフを作成できる Airflow のようなフレームワークを評価しています。つまり、ワークフローを開始する前にタスクとその依存関係を具体的に知りません。グラフのレベル数しか知りません。

ここで説明されているように、Airflow を使い始め、XCom を使用してグラフの状態を維持しています: Airflow で動的ワークフローを作成する適切な方法

また、タスクの依存関係の説明を含む JSON スニペットを XCom 行に保存することで、このアプローチを少し拡張しています。たとえば、次のようになります。

{
    "key": "first_file",
    "tasks" :
        [
            {
              "task_id" : "third_task",
              "dependencies" : ["first_task", "second_task"]
            }
        ]
}

DAG を再実行する必要がないことに注意してください。私の DAG は外部でスケジュールされることを意図しており、最初の DagRun が完了すると、その後新しいタスクが削除/削除/変更されることはありません。再実行が必要な場合は、代わりに新しい DAG を作成します。

私が使用している手法は次のとおりです。2 つのタスクで DAG を作成します。1 つはセンサーです (これにより、DagRun が最後まで常に実行状態にあることが保証されます)。

class WaitTasksSensor(BaseSensorOperator):
    ...
    def poke(self, context):
        files = os.list_dir(MY_DIR)

        for f in files:
           filename = os.path.join(MY_DIR, f)
           json_file = open(filename).read()
           json_dict = json.loads(json_file)
           key = json_dict["key"]
           self.xcom_push(context, key + "_" + self.dag_id, json_file)

        # This sensor completes successfully only when the "end" task appears in the graph
        last_task_id = "end_" + self.dag_id
        return last_task_id in self.dag.task_ids


def create_dags(dag_id):
    with DAG(dag_id, schedule_interval=None):
        first = DummyOperator(task_id="first_" + dag_id)

        wait_sensor = WaitTasksSensor(task_id="wait_sensor_" + dag_id, mode="reschedule")
        first >> wait_sensor

        pull_tasks(wait_sensor) # Code below
dag = create_dags("dag_1")

Sensor がタスクとその依存関係を表す JSON ファイル (フォルダーに入れられ続ける) をプッシュする間、私は DAG コードで XCom からタスクをプルしようとします。

def pull_tasks(previous_task):
   current_dag = previous_task.dag
   dag_id = current_dag.dag_id
   last_run = current_dag.get_last_dagrun(include_externally_triggered=True)
  
   if not last_run:
       return

   last_run_date = last_run.execution_date
   task_instances = previous_task.get_task_instances(start_date=last_run_date)

   if not task_instances:
       return

   last_task_instance = task_instance[-1]

   json_ids = [...]

   for json_id in json_ids:
       task_graph_json = last_task_instance.xcom_pull(task_ids=previous_task.task_id,
                                                      key=json_id + "_" + dag_id,
                                                      dag_id=dag_id)
    
       if task_graph:
           task_graph_deserialized = json.loads(task_graph_json)
           tasks = task_graph_deserialized["tasks"]
           create_dynamic_tasks(dag, task_dicts)

def create_dynamic_tasks(dag, task_dicts):
   dag_id = dag.dag_id

   for task_dict in task_dicts:
       task = DummyOperator(task_id=task_id + "_" + dag_id,
                            dag=dag)
       dependencies = task_dict["dependencies"]
 
       for predecessor_id in dependencies:
           predecessor = dag.get_task(predecessor_id + "_" + dag_id)
           predecessor >> task

私の質問は、Airflow はそのようなユースケースの有効な手段ですか? それとも、主なユース ケース (つまり、実行時に生成されない静的タスクを使用した固定ワークフロー) から少し拡張しすぎているのでしょうか?

このアプローチは、たとえば、数万の DAG と数十万のタスクに対応できますか? または、これをより簡単な方法で達成するための他の同様のツールはありますか?

4

1 に答える 1