6

私が直面している現在の問題は、非循環依存グラフで実行する必要があるタスクによってそれぞれ処理および更新する必要があるドキュメントが MongoDB コレクションにあることです。上流のタスクがドキュメントの処理に失敗した場合、そのドキュメントは前提条件の情報で更新されていないため、従属タスクはそのドキュメントを処理できません。

Airflow を使用する場合、次の 2 つの解決策があります。

  1. ドキュメントごとに DAG をトリガーし、ドキュメント ID を で渡します--conf。これに関する問題は、これが Airflow の意図された使用方法ではないことです。スケジュールされたプロセスを実行することは決してなく、ドキュメントがコレクションにどのように表示されるかに基づいて、1 日あたり 1440 の Dagrun を作成します。

  2. その期間のコレクションで作成されたすべてのドキュメントを処理するために、その期間ごとに DAG を実行します。これは Airflow が期待される動作方法に従いますが、問題は、タスクが 1 つのドキュメントの処理に失敗した場合、依存するタスクが他のドキュメントを処理できないことです。また、ドキュメントがタスクによって処理されるのに他のドキュメントよりも時間がかかる場合、それらの他のドキュメントは、その単一のドキュメントが DAG を続行するのを待っています。

エアフローよりも良い方法はありますか? または、現在見ている 2 つの方法よりも Airflow でこれを処理するためのより良い方法はありますか?

4

5 に答える 5

0

より重いタスクを並行して処理し、成功した操作を下流にフィードします。私の知る限り、成功をダウンストリーム タスクに非同期的にフィードすることはできないため、ダウンストリームに移動するまですべてのスレッドが終了するのを待つ必要がありますが、これは、レコードごとに 1 つの DAG を生成するよりも十分に受け入れられます。これらの行で:

タスク 1: いくつかのタイムスタンプ (冪等性を思い出してください) とフィード タスク (つまり、xcom 経由) による mongo フィルタリングを読み取ります。

タスク 2: PythonOperator を使用して並列処理を実行するか、K8sPod を使用してさらに優れた処理を実行します。つまり、次のようになります。

def thread_fun(ret):
    while not job_queue.empty():
        job = job_queue.get()
        try:        
            ret.append(stuff_done(job))
        except:
            pass
    job_queue.task_done()
    return ret

# Create workers and queue
threads = []
ret = [] # a mutable object
job_queue = Queue(maxsize=0)

for thr_nr in appropriate_thread_nr:
    worker = threading.Thread(
        target=thread_fun,
        args=([ret])
    )
    worker.setDaemon(True)
    threads.append(worker)

# Populate queue with jobs
for row in xcom_pull(task_ids=upstream_task):
    job_queue.put(row)

# Start threads
for thr in threads:
    thr.start()

# Wait to finish their jobs
for thr in threads:
    thr.join()

xcom_push(ret)

タスク 3: 前のタスクから派生したことをさらに実行する、など

于 2020-01-17T19:49:00.637 に答える
0

trigger_rule「all_success」から「all_done」に変更できます

https://github.com/apache/airflow/blob/62b21d747582d9d2b7cdcc34a326a8a060e2a8dd/airflow/example_dags/example_latest_only_with_trigger.py#L40

また、trigger_rule「one_failed」に設定して失敗したドキュメントを処理するブランチを作成して、失敗したドキュメントのプロセスを別の方法で移動することもできます (たとえば、「失敗した」フォルダーに移動して通知を送信します)。

于 2019-10-16T18:11:38.007 に答える