14

コード:

Python バージョン 2.7.x および airflow バージョン 1.5.1

私のdagスクリプトはこれです

from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta


default_args = {
'owner': 'xyz',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['xyz@email.in'],
'schedule_interval':timedelta(minutes=5),
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing', default_args=default_args)
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
    t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
    t.set_upstream(run_this_first)

そこから、6 つのタスクで DAG を作成していることがわかります。最初のタスク (Start1) が最初に開始され、その後、他の 5 つのタスクがすべて開始されます。

現在、DAG の開始の間に 5 分の時間遅延を与えています

最初のタイプの 6 つのタスクすべてで完全に実行されましたが、5 分後に DAG が再開されません。

1 時間以上経過しても、DAG が再起動されません。私が間違っていたかどうかはわかりません。

誰かが私に何が間違っているのかを指摘できれば、それは本当に素晴らしいことです.私airflow testing clearは同じことが起こるために使用してクリアしようとしました.それは最初のインスタンスを実行した後、ただそこに立っていました.

コマンドラインに表示される唯一のものはGetting all instance for DAG testing

schedule_interval の位置を変更すると、スケジュール間隔なしで並列に実行されます。つまり、5 分で 300 以上のタスク インスタンスが完了します。5分のスケジュール間隔はありません

コード 2:

from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta


default_args = {
'owner': 'xyz',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['xyz@email.in'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing',schedule_interval=timedelta(minutes=5),default_args=default_args)#Schedule here
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
    t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
    t.set_upstream(run_this_first)
4

2 に答える 2

11

コード 2 の場合、毎分実行される理由は次のとおりだと思います。

  1. 開始時間は 2015-10-13 00:00 です

  2. スケジュール間隔は 5 分です

  3. スケジューラーのハートビートごと (デフォルトでは 5 秒)、DAG がチェックされます

    • 最初のチェック: 開始日 (最後の実行日が見つからない) + スケジューラー間隔 < 現在時刻? はいの場合、DAG が実行され、最後の実行時間が記録されます。(例: 2015-10-13 00:00 + 5min < 現在?)
    • 次のハートビートの 2 番目のチェック: 最後の実行時間 + スケジューラー間隔 < 現在の時間? その場合、DAG が再度実行されます。
    • ....

ソリューションは、DAG start_date を として設定しますdatetime.now() - schedule_interval

また、デバッグしたい場合:

  1. settings.pyで LOGGINGLEVEL をdebug設定する

  2. のクラスメソッドis_queueable()airflow.models.TaskInstance変更

:

def is_queueable(self, flag_upstream_failed=False):
    logging.debug('Checking whether task instance is queueable or not!')
    if self.execution_date > datetime.now() - self.task.schedule_interval:
        logging.debug('Too early to execute: execution_date {0} + task.schedule_interval {1} > datetime.now() {2}'.format(self.execution_date, self.task.schedule_interval, datetime.now()))
        return False
        ...
于 2015-10-15T13:59:04.567 に答える
4

開始時刻 (2015-10-13 00:00) が現在の時刻よりも短いため、気流のバックフィルがトリガーされます。気流スケジューラーが毎秒検出した 2015-10-13 00:00 (開始日) から実行されますが、実行日は 5 分 (タスク間隔時間) の間です。

ログ名を参照してください。

$tree airflow/logs/testing/
testing/
|-- Orders10
|   |-- 2015-10-13T00:00:00
|   |-- 2015-10-13T00:05:00
|   -- 2015-10-13T00:10:00
|-- Orders11
|   |-- 2015-10-13T00:00:00
|   |-- 2015-10-13T00:05:00
|   -- 2015-10-13T00:10:00
|-- Orders12
|   |-- 2015-10-13T00:00:00
|   |-- 2015-10-13T00:05:00
|   -- 2015-10-13T00:10:00
|-- Orders13
|   |-- 2015-10-13T00:00:00
|   |-- 2015-10-13T00:05:00
|   -- 2015-10-13T00:10:00
|-- Orders14
|   |-- 2015-10-13T00:00:00
|   |-- 2015-10-13T00:05:00
|   -- 2015-10-13T00:10:00
-- Start1
    |-- 2015-10-13T00:00:00
    |-- 2015-10-13T00:05:00
    |-- 2015-10-13T00:10:00
    -- 2015-10-13T00:15:00

ログの作成時刻を表示します。

$ll airflow/logs/testing/Start1
-rw-rw-r-- 1 admin admin 4192 Nov  9 14:50 2015-10-13T00:00:00
-rw-rw-r-- 1 admin admin 4192 Nov  9 14:50 2015-10-13T00:05:00
-rw-rw-r-- 1 admin admin 4192 Nov  9 14:51 2015-10-13T00:10:00
-rw-rw-r-- 1 admin admin 4192 Nov  9 14:52 2015-10-13T00:15:00

また、Web UI でタスク インスタンスを確認できます。

気流タスク インスタンス

于 2015-11-09T07:08:40.117 に答える