Airflow を使用して単純なタスク python を実行しようとしています。
from __future__ import print_function
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
from pprint import pprint
seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
datetime.min.time())
args = {
'owner': 'airflow',
'start_date': seven_days_ago,
}
dag = DAG(dag_id='python_test', default_args=args)
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print',
provide_context=True,
python_callable=print_context,
dag=dag)
たとえば、試してみると:
気流テスト python_test print 2015-01-01
できます!
def print_context(ds, **kwargs)
今、私は自分の関数を他のpythonファイルに入れたいと思っています。だから私はという別のファイルを作成します: simple_test.py と変更:
run_this = PythonOperator(
task_id='print',
provide_context=True,
python_callable=simple_test.print_context,
dag=dag)
今、私は再び実行しようとします:
気流テスト python_test print 2015-01-01
そしてOK!それはまだ動作します!
ただし、ファイルを使用してワーカー モジュールなどのモジュールを作成する場合は、それをSimplePython.py
インポート ( from worker import SimplePython
) して試してください。
気流テスト python_test print 2015-01-01
次のメッセージが表示されます。
ImportError: ワーカーという名前のモジュールがありません
質問:
- DAG 定義内にモジュールをインポートすることはできますか?
- Airflow+Celery は、必要なすべての Python ソース ファイルをワーカー ノード間でどのように配布しますか?