私たちは最近、Airflow を「データ ワークフロー」エンジンとして採用しようとしました。ほとんどのことは理解できましたが、DAG をトリガーするタイミングをスケジューラがどのように計算するかについては、まだ灰色の領域にいます。
この単純なダグを見てください:
from airflow import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
dag_options = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': datetime.now()
}
with DAG('test_dag1', schedule_interval="5 * * * *", default_args=dag_options) as dag:
task1 = BashOperator(
task_id='task1',
bash_command='date',
dag=dag)
スケジュールはこれを取得しますが、実行しません。ここで、「start_date」を次のように変更すると:
datetime(year=xxxx,month=yyyy=day=zzzz)
xxxx、yyyy、zzzz は今日の日付で、実行が開始されます。この原因は、スケジューラがソース DAG フォルダーからこの DAG を再読み取りし続け、毎回 datetime.now() を実行し、開始日が現在キューに入れられている日付と異なることに気付き、この DAG を再追加して再スケジュール/実行日を前倒ししますか (私のdag_dir_list_intervalは 300 です)?
また、気流では、私が理解しているように、dag の一時停止が解除される (または dags_are_paused_at_creation = False で追加される) と、スケジューラーは次のように実行をスケジュールします。
- 1 日目の実行: 直後 (開始日 + 間隔)
- 2 日目の実行: (開始日 + (間隔 * 2)) の直後
- 3 日目の実行: (開始日 + (間隔 * 3)) の直後
これは正しい仮定ですか?
更新 (2017 年 7 月 30 日)
上記の仮定に基づいて、今日 (07/30/2017) にこの DAG を作成しました。
from airflow import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
dag_options = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date':
datetime(year=2017,month=7,day=30,hour=20,minute=10)
}
with DAG('test_dag_100', schedule_interval="*/10 * * * *",
default_args=dag_options) as dag:
task1 = BashOperator(
task_id='task_100',
bash_command='date',
dag=dag)
(UTC)に開始する必要があります:
- 2017/7/30 20:20:00
- 2017/7/30 20:30:00
- 2017/7/30 20:40:00
残念ながら、これは起こっていません。これが私のダッシュボードのスクリーンショットです。
20:21:00 に DAG が実行されなかった理由を誰か説明できますか? 20:31:00 以降、まだ実行されませんでした...ここで何が欠けていますか?
ちなみに、何らかの理由で、ダッシュボードから手動で DAG を開始するたびに、DAG が「実行中」の段階にあることに気付きました。どうしてこれなの?手動で開始することは、開始タイミング オプション (start_date/interval/etc) と関係がありますか??
あなたが提供できる説明をありがとう