10

データ パイプラインの気流を実験しています。残念ながら、これまでのところ、bigquery オペレーターで動作させることはできません。できる限りの解決策を探しましたが、まだ行き詰まっています.ローカルで実行されているシーケンシャルエグゼキューターを使用しています。

これが私のコードです:

from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['example@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG(
    dag_id='bigQueryPipeline', 
    default_args=default_args, 
    schedule_interval=timedelta(1)
)

t1 = BigQueryOperator(
    task_id='bigquery_test',
    bql='SELECT COUNT(userId) FROM [events:EVENTS_20160501]',
    destination_dataset_table=False,
    bigquery_conn_id='bigquery_default',
    delegate_to=False,
    udf_config=False,
    dag=dag,
)

エラーメッセージ:

[2016-08-27 00:13:14,665] {models.py:1327} ERROR - 'project'
Traceback (most recent call last):
  File "/Users/jean.rodrigue/anaconda/bin/airflow", line 15, in <module>
    args.func(args)
  File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/bin/cli.py", line 352, in test
    ti.run(force=True, ignore_dependencies=True, test_mode=True)
  File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/models.py", line 1245, in run
    result = task_copy.execute(context=context)
  File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/contrib/operators/bigquery_operator.py", line 57, in execute
    conn = hook.get_conn()
  File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 54, in get_conn
    project = connection_extras['project']
4

3 に答える 3

11

明確に文書化されていないため、最終的に見つけるのにしばらく時間がかかりました。Airflow UI で、[管理] -> [接続] に移動します。その接続 ID は、パラメーター bigquery_connection_id によって参照されているものです。"extras" フィールドに、"project" の ak,v ペアを定義する json オブジェクトを追加する必要があります: ""

Airflow を実行しているボックスでアカウントを明示的に承認していない場合は、「service_account」と「key_path」のキーも追加する必要があります。(gcloud 認証)

于 2016-11-01T21:44:55.773 に答える
1

最近、両方bigquery_conn_idgoogle_cloud_storage_conn_id次のように指定して、同様の問題を修正しました。

t1 = BigQueryOperator(
  task_id='bigquery_test',
  bql='SELECT COUNT(userId) FROM [events:EVENTS_20160501]',
  destination_dataset_table=False,
  bigquery_conn_id='bigquery_default',             <-- Need these both
  google_cloud_storage_conn_id='bigquery_default', <-- becasue of inheritance 
  delegate_to=False,
  udf_config=False,
  dag=dag,
)

この回答の詳細を参照してください: https://stackoverflow.com/a/45664830/634627

于 2017-08-13T20:51:31.880 に答える