29

Pythonファイルのフォルダー内に一連のPythonタスクがあります:file1.py、file2.py、...

Airflow のドキュメントを読みましたが、DAG で Python ファイルのフォルダーとファイル名を指定する方法がわかりません。

これらの python ファイルを実行したいと思います (Python Operator による Python 関数ではありません)。

タスク 1: file1.py を実行します (いくつかのインポート パッケージを使用)

タスク 2: file2.py を実行します (他のインポート パッケージを使用)

それは役に立ちます。ありがとうございます。それでは、お元気で

4

3 に答える 3

22

「これらの python ファイルを実行したい (Python オペレーターを介した Python 関数ではない)」と尋ねていることは承知しています。しかし、これはおそらくAirflowの使用があなたよりも効果的ではないと思います. 以前に書かれた回答にも混乱が見られるので、これがあなたが望んでいた方法であり、タスクを実行するために私が推奨する方法です。

仮定:

dags/
    my_dag_for_task_1_and_2.py
    tasks/
         file1.py
         file2.py

を避けるためのあなたの要求PythonOperator:

#  my_dag_for_task_1_and_2.py
import datetime as dt
from airflow import DAG
from airflow.operators import BashOperator

with DAG(
    'my_dag_for_task_1_and_2',
    default_args={
        'owner': 'me',
        'start_date': datetime(…),
        …,
    }, 
    schedule_interval='8 * * * *',
) as dag:
    task_1 = BashOperator(
        task_id='task_1', 
        bash_command='/path/to/python /path/to/dags/tasks/file1.py',
    )
    task_2 = BashOperator(
        task_id='task_2', 
        bash_command='/path/to/python /path/to/dags/tasks/file2.py',
    )
    task_1 >> task_2

Airflow 用にゼロから Python を作成したわけではありませんが、以下を使用しPythonOperatorます。

#  my_dag_for_task_1_and_2.py
import datetime as dt
from airflow import DAG
from airflow.operators import PythonOperator
import tasks.file1
import tasks.file2

with DAG(
    'my_dag_for_task_1_and_2',
    default_args={
        'owner': 'me',
        'start_date': datetime(…),
        …,
    }, 
    schedule_interval='8 * * * *',
) as dag:
    task_1 = PythonOperator(
        task_id='task_1', 
        python_callable=file1.function_in_file1,
    )
    task_2 = PythonOperator(
        task_id='task_2', 
        python_callable=file2.function_in_file2,  # maybe main?
    )
    task_1 >> task_2
于 2018-03-26T12:42:43.403 に答える