3

私は airbnb の気流を使用しており、以下に示す簡単なタスクを作成しています。ただし、間隔を毎時またはその他の間隔に設定しても、スケジューラはタスクを実行し続けます。私が気付いたもう 1 つのことは、スケジュール間隔を「@once」に設定すると、dag が実行されないことです。

ここの規則に従いましたhttp://airflow.readthedocs.org/en/latest/scheduler.html#dag-runs

私が使っているシンプルなダグ。

"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 1, 5),
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG(
    'schedule_test', default_args=default_args, schedule_interval='@hourly')

# t1, t2 and t3 are examples of tasks created by instatiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

参照用のスケジューラ出力。ご覧のとおり、何度も実行し続けていますが、私の日記では、schedule_interval='@hourly' があります。

2016-01-06 20:34:37,266 - root - INFO - Starting the scheduler
2016-01-06 20:34:37,267 - root - INFO - Filling up the DagBag from /usr/local/airflow/dags
2016-01-06 20:34:37,267 - root - INFO - Importing /usr/local/airflow/dags/test.py
2016-01-06 20:34:37,272 - root - INFO - Loaded DAG <DAG: schedule_test>
2016-01-06 20:34:37,281 - root - INFO - Prioritizing 0 queued jobs
2016-01-06 20:34:37,288 - root - INFO - Importing /usr/local/airflow/dags/test.py
2016-01-06 20:34:37,291 - root - INFO - Loaded DAG <DAG: schedule_test>
2016-01-06 20:34:37,318 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 00:00:00: scheduled__2016-01-06T00:00:00, externally triggered: False>
2016-01-06 20:34:37,321 - root - INFO - Getting list of tasks to skip for active runs.
2016-01-06 20:34:37,323 - root - INFO - Checking dependencies on 1 tasks instances, minus 0 skippable ones
2016-01-06 20:34:37,326 - root - INFO - Adding to queue: airflow run schedule_test print_date 2016-01-06T00:00:00 --local -sd DAGS_FOLDER/test.py 
2016-01-06 20:34:37,347 - root - INFO - Done queuing tasks, calling the executor's heartbeat
2016-01-06 20:34:37,347 - root - INFO - Loop took: 0.071298 seconds
2016-01-06 20:34:37,356 - root - INFO - Finding 'running' jobs without a recent heartbeat
2016-01-06 20:34:37,357 - root - INFO - Failing jobs without heartbeat after 2016-01-06 20:32:22.357089
2016-01-06 20:34:42,269 - root - INFO - Prioritizing 0 queued jobs
2016-01-06 20:34:42,274 - root - INFO - Importing /usr/local/airflow/dags/test.py
2016-01-06 20:34:42,277 - root - INFO - Loaded DAG <DAG: schedule_test>
2016-01-06 20:34:42,295 - root - INFO - Done queuing tasks, calling the executor's heartbeat
2016-01-06 20:34:42,296 - root - INFO - Loop took: 0.029931 seconds
2016-01-06 20:34:42,309 - root - INFO - Finding 'running' jobs without a recent heartbeat
2016-01-06 20:34:42,310 - root - INFO - Failing jobs without heartbeat after 2016-01-06 20:32:27.310303
2016-01-06 20:34:42,314 - root - INFO - commandairflow run schedule_test print_date 2016-01-06T00:00:00 --local -sd DAGS_FOLDER/test.py 
Logging into: /usr/local/airflow/logs/schedule_test/print_date/2016-01-06T00:00:00
2016-01-06 20:34:47,895 - root - INFO - Prioritizing 0 queued jobs
2016-01-06 20:34:47,900 - root - INFO - Importing /usr/local/airflow/dags/test.py
2016-01-06 20:34:47,904 - root - INFO - Loaded DAG <DAG: schedule_test>
2016-01-06 20:34:47,922 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 00:00:00: scheduled__2016-01-06T00:00:00, externally triggered: False>
2016-01-06 20:34:47,925 - root - INFO - Marking run <DagRun schedule_test @ 2016-01-06 00:00:00: scheduled__2016-01-06T00:00:00, externally triggered: False> successful
2016-01-06 20:34:47,926 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 01:00:00: scheduled__2016-01-06T01:00:00, externally triggered: False>
2016-01-06 20:34:47,928 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 02:00:00: scheduled__2016-01-06T02:00:00, externally triggered: False>
2016-01-06 20:34:47,937 - root - INFO - Getting list of tasks to skip for active runs.
2016-01-06 20:34:47,940 - root - INFO - Checking dependencies on 2 tasks instances, minus 0 skippable ones
2016-01-06 20:34:47,943 - root - INFO - Adding to queue: airflow run schedule_test print_date 2016-01-06T01:00:00 --local -sd DAGS_FOLDER/test.py 
2016-01-06 20:34:47,947 - root - INFO - Adding to queue: airflow run schedule_test print_date 2016-01-06T02:00:00 --local -sd DAGS_FOLDER/test.py 
2016-01-06 20:34:47,960 - root - INFO - Done queuing tasks, calling the executor's heartbeat
2016-01-06 20:34:47,960 - root - INFO - Loop took: 0.067789 seconds
2016-01-06 20:34:47,968 - root - INFO - Finding 'running' jobs without a recent heartbeat
2016-01-06 20:34:47,968 - root - INFO - Failing jobs without heartbeat after 2016-01-06 20:32:32.968940
2016-01-06 20:34:52,901 - root - INFO - Prioritizing 0 queued jobs
2016-01-06 20:34:52,906 - root - INFO - Importing /usr/local/airflow/dags/test.py
2016-01-06 20:34:52,909 - root - INFO - Loaded DAG <DAG: schedule_test>
2016-01-06 20:34:52,942 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 01:00:00: scheduled__2016-01-06T01:00:00, externally triggered: False>
2016-01-06 20:34:52,946 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 02:00:00: scheduled__2016-01-06T02:00:00, externally triggered: False>
2016-01-06 20:34:52,948 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 03:00:00: scheduled__2016-01-06T03:00:00, externally triggered: False>
2016-01-06 20:34:52,950 - root - INFO - Getting list of tasks to skip for active runs.
2016-01-06 20:34:52,953 - root - INFO - Checking dependencies on 3 tasks instances, minus 0 skippable ones
2016-01-06 20:34:52,961 - root - INFO - Adding to queue: airflow run schedule_test print_date 2016-01-06T03:00:00 --local -sd DAGS_FOLDER/test.py 
2016-01-06 20:34:52,975 - root - INFO - Done queuing tasks, calling the executor's heartbeat
2016-01-06 20:34:52,976 - root - INFO - Loop took: 0.07741 seconds
2016-01-06 20:34:52,982 - root - INFO - Finding 'running' jobs without a recent heartbeat
2016-01-06 20:34:52,983 - root - INFO - Failing jobs without heartbeat after 2016-01-06 20:32:37.983542
2016-01-06 20:34:52,987 - root - INFO - commandairflow run schedule_test print_date 2016-01-06T02:00:00 --local -sd DAGS_FOLDER/test.py 
Logging into: /usr/local/airflow/logs/schedule_test/print_date/2016-01-06T02:00:00
2016-01-06 20:34:58,583 - root - INFO - commandairflow run schedule_test print_date 2016-01-06T01:00:00 --local -sd DAGS_FOLDER/test.py 
Logging into: /usr/local/airflow/logs/schedule_test/print_date/2016-01-06T01:00:00
2016-01-06 20:35:04,215 - root - INFO - Prioritizing 0 queued jobs
2016-01-06 20:35:04,223 - root - INFO - Importing /usr/local/airflow/dags/test.py
2016-01-06 20:35:04,229 - root - INFO - Loaded DAG <DAG: schedule_test>
2016-01-06 20:35:04,263 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 01:00:00: scheduled__2016-01-06T01:00:00, externally triggered: False>
2016-01-06 20:35:04,267 - root - INFO - Marking run <DagRun schedule_test @ 2016-01-06 01:00:00: scheduled__2016-01-06T01:00:00, externally triggered: False> successful
2016-01-06 20:35:04,268 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 02:00:00: scheduled__2016-01-06T02:00:00, externally triggered: False>
2016-01-06 20:35:04,272 - root - INFO - Marking run <DagRun schedule_test @ 2016-01-06 02:00:00: scheduled__2016-01-06T02:00:00, externally triggered: False> successful
2016-01-06 20:35:04,273 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 03:00:00: scheduled__2016-01-06T03:00:00, externally triggered: False>
2016-01-06 20:35:04,276 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 04:00:00: scheduled__2016-01-06T04:00:00, externally triggered: False>
4

2 に答える 2