私は Google Cloud Composer を初めて使用し、作成した DAG で奇妙な問題と思われる問題に遭遇しました。クラウド ストレージから tar.gz ファイルを取得し、それを .gz ファイルとして再圧縮してから、.gz ファイルを BigQuery にロードするプロセスがあります。昨日、作成された「シャード」から新しいテーブルへの挿入であるプロセスに新しいステップを追加しようとしました。
DAG 実行のステップの順序を変更するまで、これを機能させることができませんでした。私の DAG には、「delete_tar_gz_files_op」というステップがあります。これが「insert_daily_file_into_nli_table_op」の前に実行された場合、挿入は実行されませんでした (Composer では失敗せず、まったく実行されないように見えました)。コードに他の変更を加えずにこれら 2 つのステップの順序を入れ替えると、挿入は期待どおりに機能します。誰がこれを引き起こす可能性があるか知っていますか? これらの 2 つのステップはまったく関連していないため、なぜこれが起こるのかわかりません。1 つは、ある大きなクエリ テーブルから別のテーブルへの挿入クエリを実行します。もう 1 つは、クラウド ストレージにある tar.gz ファイルを削除します。
現在機能している私のdag実行順序:
initialize >> FilesToProcess >> download_file >> convert_task >> upload_task >> gcs_to_bq >> archive_files_op >> insert_daily_file_into_nli_table_op >> delete_tar_gz_files_op
使用されるコードの一部:
#The big query operator inserts the files from the .gz file into a table in big query.
gcs_to_bq = GoogleCloudStorageToBigQueryOperator(
task_id='load_basket_data_into_big_query'+job_desc,
bucket="my-processing-bucket",
bigquery_conn_id='bigquery_default',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
compression='GZIP',
source_objects=['gzip/myzip_'+process_date+'.gz'],
destination_project_dataset_table='project.dataset.basket_'+clean_process_date,
field_delimiter='|',
skip_leading_rows=0,
google_cloud_storage_conn_id="bigquery_default",
schema_object="schema.json",
dag=dag
)
#The created shard is then inserted into basket_raw_nli.basket_nli. This is a partitioned table which contains only the NLI subtype
insert_daily_file_into_nli_table_op = bigquery_operator.BigQueryOperator(
task_id='insert_daily_file_into_nli_table_op_'+job_desc,
bql=bqQuery,
use_legacy_sql=False,
bigquery_conn_id='bigquery_default',
write_disposition='WRITE_APPEND',
allow_large_results=True,
destination_dataset_table=False,
dag=dag)
#The tar file created can now be deleted from the raw folder
delete_tar_gz_files_op=python_operator.PythonOperator(
task_id='delete_tar_gz_files_'+job_desc,
python_callable=delete_tar_gz_files,
op_args=[file, process_date],
provide_context=False,
dag=dag)
def delete_tar_gz_files(file, process_date):
execution_command='gsutil rm ' + source_dir + '/' + file
print(execution_command)
returncode=os.system(execution_command)
if returncode != 0:
#logging.error("Halting process...")
exit(1)
手動実行ステータス: 実行ステータス