Google cloud-composer で apache-airflow を使用して、Google AI プラットフォームでモデルのトレーニングを行うバッチ処理をスケジュールしようとしています。MLEngineTrainingOperator で master_type を指定できないこの質問で説明しているように、気流演算子を使用できませんでした
コマンドラインを使用して、ジョブを正常に起動できました。だから今私の問題は、このコマンドを気流に統合することです。
BashOperator を使用してモデルをトレーニングできますが、バージョンを作成してデフォルトとして設定する前に、ジョブが完了するのを待つ必要があります。この DAG は、ジョブが完了する前にバージョンを作成します
bash_command_train = "gcloud ai-platform jobs submit training training_job_name " \
"--packages=gs://path/to/the/package.tar.gz " \
"--python-version=3.5 --region=europe-west1 --runtime-version=1.14" \
" --module-name=trainer.train --scale-tier=CUSTOM --master-machine-type=n1-highmem-16"
bash_train_operator = BashOperator(task_id='train_with_bash_command',
bash_command=bash_command_train,
dag=dag,)
...
create_version_op = MLEngineVersionOperator(
task_id='create_version',
project_id=PROJECT,
model_name=MODEL_NAME,
version={
'name': version_name,
'deploymentUri': export_uri,
'runtimeVersion': RUNTIME_VERSION,
'pythonVersion': '3.5',
'framework': 'SCIKIT_LEARN',
},
operation='create')
set_version_default_op = MLEngineVersionOperator(
task_id='set_version_as_default',
project_id=PROJECT,
model_name=MODEL_NAME,
version={'name': version_name},
operation='set_default')
# Ordering the tasks
bash_train_operator >> create_version_op >> set_version_default_op
トレーニングの結果、Gcloud ストレージ内のファイルが更新されます。このファイルが更新されるまで待機するオペレーターまたはセンサーを探しているので、GoogleCloudStorageObjectUpdatedSensor に気付きましたが、このファイルが更新されるまで再試行する方法がわかりません。別の解決策は、ジョブが完了するかどうかを確認することですが、その方法もわかりません。
どんな助けでも大歓迎です。