コード:
Python バージョン 2.7.x および airflow バージョン 1.5.1
私のdagスクリプトはこれです
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'xyz',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['xyz@email.in'],
'schedule_interval':timedelta(minutes=5),
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing', default_args=default_args)
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
t.set_upstream(run_this_first)
そこから、6 つのタスクで DAG を作成していることがわかります。最初のタスク (Start1) が最初に開始され、その後、他の 5 つのタスクがすべて開始されます。
現在、DAG の開始の間に 5 分の時間遅延を与えています
最初のタイプの 6 つのタスクすべてで完全に実行されましたが、5 分後に DAG が再開されません。
1 時間以上経過しても、DAG が再起動されません。私が間違っていたかどうかはわかりません。
誰かが私に何が間違っているのかを指摘できれば、それは本当に素晴らしいことです.私airflow testing clear
は同じことが起こるために使用してクリアしようとしました.それは最初のインスタンスを実行した後、ただそこに立っていました.
コマンドラインに表示される唯一のものはGetting all instance for DAG testing
schedule_interval の位置を変更すると、スケジュール間隔なしで並列に実行されます。つまり、5 分で 300 以上のタスク インスタンスが完了します。5分のスケジュール間隔はありません
コード 2:
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'xyz',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['xyz@email.in'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing',schedule_interval=timedelta(minutes=5),default_args=default_args)#Schedule here
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
t.set_upstream(run_this_first)