22

Airflow は主に ETL/入札データ関連のジョブに使用されています。ユーザーアクションが将来的に一連の依存タスクをトリガーするビジネスワークフローに使用しようとしています。これらのタスクの一部は、他の特定のユーザー アクションに基づいてクリア (削除) する必要がある場合があります。これを処理する最善の方法は、動的タスク ID を使用することだと思いました。Airflow が動的 DAG ID をサポートしていることを読みました。そこで、DAG ID とタスク ID をコマンド ライン パラメーターとして受け取る単純な Python スクリプトを作成しました。しかし、私はそれを機能させるのに問題が発生しています。dag_id が見つからないというエラーが発生します。誰もこれを試しましたか?コマンドラインでpythonとして実行するスクリプト(tmp.pyと呼びます)のコードは次のとおりです(python tmp.py 820 2016-08-24T22:50:00):

from __future__ import print_function
import os
import sys
import shutil
from datetime import date, datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
execution = '2016-08-24T22:20:00'
if len(sys.argv) > 2  :
   dagid =  sys.argv[1]
   taskid = 'Activate' + sys.argv[1]
   execution = sys.argv[2]
else:
   dagid = 'DAGObjectId'
   taskid = 'Activate'
default_args = {'owner' : 'airflow', 'depends_on_past': False, 'start_date':date.today(), 'email': ['fake@fake.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1}
dag = DAG(dag_id = dagid,
       default_args=default_args,
       schedule_interval='@once',
      )
 globals()[dagid] = dag
task1 = BashOperator(
    task_id = taskid,
    bash_command='ls -l',
    dag=dag)

fakeTask = BashOperator(
    task_id = 'fakeTask',
    bash_command='sleep 5',
    retries = 3,
    dag=dag)
task1.set_upstream(fakeTask)

airflowcmd = "airflow run " + dagid + " " + taskid + "  " + execution
print("airflowcmd = " + airflowcmd)
os.system(airflowcmd)
4

2 に答える 2

23

試行錯誤を繰り返した結果、これにたどり着きました。うまくいけば、それは誰かを助けるでしょう。仕組みは次のとおりです。 テンプレートを介して動的に dags/task を生成するには、イテレータまたは外部ソース (ファイル/データベース テーブル) が必要です。DAG とタスク名を静的に保ち、ID を動的に割り当てて、DAG を他の DAG と区別することができます。この python スクリプトを dags フォルダーに配置します。エアフロー スケジューラを起動すると、ハートビートごとにこのスクリプトが実行され、データベースの DAG テーブルに DAG が書き込まれます。DAG (一意の DAG ID) が既に書き込まれている場合は、単純にスキップされます。スケジューラは、個々の DAG のスケジュールも調べて、実行の準備ができているものを判断します。DAG の実行準備ができている場合、DAG はそれを実行し、そのステータスを更新します。サンプルコードは次のとおりです。

from airflow.operators import PythonOperator
from airflow.operators import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta
import sys
import time

dagid   = 'DA' + str(int(time.time()))
taskid  = 'TA' + str(int(time.time()))

input_file = '/home/directory/airflow/textfile_for_dagids_and_schedule'

def my_sleeping_function(random_base):
    '''This is a function that will run within the DAG execution'''
    time.sleep(random_base)

def_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.now(), 'email_on_failure': False,                
    'retries': 1, 'retry_delay': timedelta(minutes=2)
}
with open(input_file,'r') as f:
    for line in f:
        args = line.strip().split(',')
    if len(args) < 6:
        continue
    dagid = 'DAA' + args[0]
    taskid = 'TAA' + args[0]
    yyyy    = int(args[1])
    mm      = int(args[2])
    dd      = int(args[3])
    hh      = int(args[4])
    mins    = int(args[5])
    ss      = int(args[6])
    dag = DAG(
        dag_id=dagid, default_args=def_args,
        schedule_interval='@once', start_date=datetime(yyyy,mm,dd,hh,mins,ss)
        )

    myBashTask = BashOperator(
        task_id=taskid,
        bash_command='python /home/directory/airflow/sendemail.py',
        dag=dag)

    task2id = taskid + '-X'

    task_sleep = PythonOperator(
        task_id=task2id,
        python_callable=my_sleeping_function,
        op_kwargs={'random_base': 10},
        dag=dag)

    task_sleep.set_upstream(myBashTask)

f.close()
于 2016-09-23T18:04:05.413 に答える