問題タブ [airflow-scheduler]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - Airflow で特定の基準が満たされるまで待機を実装する効率的な方法
Airflowのセンサー - 特定の基準が満たされるまで実行し続ける特定のタイプのオペレーターですが、ワーカー スロットが完全に消費されます。人々がこれを実装するためのより効率的な方法を確実に使用できるかどうかは興味深い.
私の心にいくつかのアイデア
- プールを使用して、センサーに割り当てられるワーカー スロットの数を制限する
- 下流のすべてのタスクをスキップし、外部トリガーを介してクリアして再開する
- DAG の実行を一時停止し、外部トリガーを介して再開します
その他の関連リンク:
airflow - Airflow で DAG を再起動する方法は?
DB のシャットダウンが原因で、多くのステップを含む DAG の 1 つが途中で停止しました。DAG を中断したところから開始したいのですが、できることは、DAG の個々のタスクを 1 つずつ開始することだけです。既に完了したタスクの成功に基づいて、中断したところから DAG を開始するように Airflow に指示する方法はありますか?
最初のタスクの 1 つが終了し、残りがキューに入れられているか、ステータスがないサンプルを次に示します。
個々のタスク ( runme_1
) をクリックすると、再起動できます。
ただし、依存関係が満たされていないタスク ( などrun_after_loop
) を開始しようとすると、次のエラー メッセージが表示さ
れます。
任意のタスクをクリックしてキューに入れ、すべての依存関係を正しい順序で実行できるようにしたいと考えています。それは可能ですか?
Airflow 1.9.0 を実行しています。
airflow - 最新の Airflow DAG のみを実行する
Airflow を使用して非常に単純な ETL DAG を実行したいとします。DB2 での最後の挿入時刻をチェックし、DB1 から DB2 に新しい行があればロードします。
理解できる要件がいくつかあります。
- 1 時間ごとにスケジュールされ、最初の数回の実行は 1 時間以上続きます
- 例えば。最初の実行では 1 か月のデータを処理する必要があり、72 時間続きます。
- 2 回目の実行では最後の 72 時間を処理する必要があり、7.2 時間続きます。
- 3 番目のプロセスは 7.2 時間かかり、1 時間以内に終了します。
- それ以降は毎時実行されます。
- DAG の実行中は、次の DAG を開始せずにスキップします。
- トリガー イベントを過ぎても DAG が開始されなかった場合は、その後開始しないでください。
- 他の DAG もあり、DAG は個別に実行する必要があります。
これらのパラメーターと演算子は少しわかりにくいと思いますが、それらの違いは何ですか?
depends_on_past
catchup
backfill
LatestOnlyOperator
どちらを使用する必要があり、どの LocalExecutor を使用する必要がありますか?
Ps。すでに非常によく似たスレッドがありますが、使い尽くすことはありません。
python - Airflow ワーカーの構成
私はAirflowの初心者です。この記事https://stlong0521.github.io/20161023%20-%20Airflow.htmlを参照して、Celery Executor を使用して Airflow の分散モードをセットアップしようとしています。
仕様について詳しく説明する前に、PostgreSQL を別のインスタンスにインストールしたことを確認したいと思います。
セットアップの仕様は次のとおりです。
Airflow コア/サーバー コンピューター
- パイソン3.5
- エアフロー (AIRFLOW_HOME = ~/airflow)
- セロリ
- psycogp2
- RabbitMQ
airflow.cfg で行われた構成:
実行されたテスト:
Airflow ワーカー コンピューター
以下がインストールされています:
- Python 3.5 と
- エアフロー (AIRFLOW_HOME = ~/airflow)
- セロリ
- psycogp2
airflow.cfg で行われた構成は、サーバーとまったく同じです。
ワーカー マシンで実行されたコマンドからの出力:
エアフローフラワーを実行する場合:
Airflow Core マシンで DAG を渡しています。また、DAG が処理するサンプル データ (Excel シート) を同じコア マシンにコピーしました。
私のワーカーログ
raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command 'airflow run dag_name_x task_name_xx 2018-02-19T10:15:41.657243 --local -sd /home/Distributedici/airflow/dags/sample_data_xx.py' returned non-zero exit status 1
今私のクエリは
1) DAG フォルダもワーカー コンピュータにコピーする必要がありますか?
2) 現在、ワーカー コンピューターに dag フォルダーをコピーしていないため、ワーカー プロセスがタスクを取得するのを確認できません。
私が間違いを犯している場所と、ワーカープロセスがプロセスを取得する方法を教えてください。