0

私のタスクコードは次のとおりです。

from airflow.models import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
rootdir = "/tmp/airflow"
default_args = {
    'owner': 'max',
    'depends_on_past': False,
    'start_date': datetime.now(),
    'email': ['max@test.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
dag = DAG('test3', default_args=default_args,
                    schedule_interval='*/2 * * * *')
t1 = BashOperator(
    task_id='test3-task1',
    bash_command='date  >> {rootdir}/test3-task1.out'.format(rootdir=rootdir),
    owner='max',
    dag=dag)
t2 = BashOperator(
    task_id='test3-task2',
    bash_command='whoami',
    retries=3,
    owner='max',
    dag=dag)

次に、Linux の「airflow」ユーザーでコマンド「airflow test test3 test3-task2 2016-07-25」を実行します。「whoami」を出力した結果が「気流」です。ただし、出力結果がタスクの「所有者」であることを願っています。

私の間違いは何ですか?

ありがとう

以下が出力結果です。

[2016-07-25 11:22:37,716] {bash_operator.py:64} 情報 - 一時的なスクリプトの場所:/tmp/airflowtmpoYNJE8//tmp/airflowtmpoYNJE8/test3-task2U1lpom

[2016-07-25 11:22:37,716] {bash_operator.py:65} INFO - 実行中のコマンド: whoami

[2016-07-25 11:22:37,722] {bash_operator.py:73} INFO - 出力:

[2016-07-25 11:22:37,725] {bash_operator.py:77} 情報 -気流

[2016-07-25 11:22:37,725] {bash_operator.py:80} 情報 - コマンドはリターン コード 0 で終了しました

4

2 に答える 2

0

あなたがやろうとしていることがサポートされているようには見えません。残念ながら、 bash_operatorBaseOperatorの両方のソース コードを見ると、どちらもタスクを実行する前にユーザーを変更しようとはしていません。

于 2016-10-19T20:33:02.277 に答える