11

私はAirflowの初心者です。この記事https://stlong0521.github.io/20161023%20-%20Airflow.htmlを参照して、Celery Executor を使用して Airflow の分散モードをセットアップしようとしています。

仕様について詳しく説明する前に、PostgreSQL を別のインスタンスにインストールしたことを確認したいと思います。

セットアップの仕様は次のとおりです。

Airflow コア/サーバー コンピューター

  • パイソン3.5
    • エアフロー (AIRFLOW_HOME = ~/airflow)
    • セロリ
    • psycogp2
  • RabbitMQ

airflow.cfg で行われた構成:

sql_alchemy_conn = postgresql+psycopg2://username:password@192.168.2.12:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:password@192.168.1.12:5672//
celery_result_backend = db+postgresql://username:password@192.168.2.12:5432/airflow

実行されたテスト:

RabbitMQ is running
Can connect to PostgreSQL and have confirmed that Airflow has created tables
Can start and view the webserver (including custom dags)

Airflow ワーカー コンピューター

以下がインストールされています:

  • Python 3.5 と
    • エアフロー (AIRFLOW_HOME = ~/airflow)
    • セロリ
  • psycogp2

airflow.cfg で行われた構成は、サーバーとまったく同じです。

sql_alchemy_conn = postgresql+psycopg2://username:password@192.168.2.12:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:password@192.168.1.12:5672//
celery_result_backend = db+postgresql://username:password@192.168.2.12:5432/airflow

ワーカー マシンで実行されたコマンドからの出力:

エアフローフラワーを実行する場合:

[2018-02-19 14:58:14,276] {__init__.py:57} INFO - Using executor CeleryExecutor
[2018-02-19 14:58:14,360] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/Grammar.txt
[2018-02-19 14:58:14,384] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt
[I 180219 14:58:15 command:139] Visit me at http://0.0.0.0:5555
[I 180219 14:58:15 command:144] Broker: amqp://username:password@192.168.1.12:5672//
[I 180219 14:58:15 command:147] Registered tasks: 
    ['celery.accumulate',
     'celery.backend_cleanup',
     'celery.chain',
     'celery.chord',
     'celery.chord_unlock',
     'celery.chunks',
     'celery.group',
     'celery.map',
     'celery.starmap']
[I 180219 14:58:15 mixins:224] Connected to amqp://username:password@192.168.1.12:5672//

Airflow Core マシンで DAG を渡しています。また、DAG が処理するサンプル データ (Excel シート) を同じコア マシンにコピーしました。

私のワーカーログ raise CalledProcessError(retcode, cmd) subprocess.CalledProcessError: Command 'airflow run dag_name_x task_name_xx 2018-02-19T10:15:41.657243 --local -sd /home/Distributedici/airflow/dags/sample_data_xx.py' returned non-zero exit status 1

今私のクエリは

1) DAG フォルダもワーカー コンピュータにコピーする必要がありますか?

2) 現在、ワーカー コンピューターに dag フォルダーをコピーしていないため、ワーカー プロセスがタスクを取得するのを確認できません。

私が間違いを犯している場所と、ワーカープロセスがプロセスを取得する方法を教えてください。

4

4 に答える 4

3

構成ファイルは問題ないようです。ご想像のとおり、実際にはすべてのワーカーが DAG フォルダーのコピーを必要としています。gitなどを使用して、それらを同期して最新の状態に保つことができます。

于 2018-02-21T18:16:49.733 に答える
0

これは少し遅れていますが、既存の回答から、「手動」展開(git/scpなどを介して)以外にDAGを共有する方法がないように見えるため、それでも誰かを助けるかもしれませんが、方法があります。

Airflow はpickling ( -pCLI またはcommand: scheduler -pdocker-compose ファイルのパラメーター) をサポートします。これにより、DAG をサーバー/マスターにデプロイし、それらをシリアル化してワーカーに送信できます (したがって、複数の DAG をデプロイする必要はありません)。 DAG の同期が取れていない問題を回避できます)。

酸洗対応CeleryExecutorです。

Pickling にはいくつかの制限があります。特に、クラスと関数の実際のコードはシリアル化されません (完全修飾名のみがシリアル化されます)。そのため、DAG を参照していないコードを参照してシリアル化解除しようとすると、エラーが発生します。ターゲット環境にある。pickle の詳細については、https ://docs.python.org/3.3/library/pickle.html を参照してください。

于 2019-10-03T12:22:50.550 に答える