4

気流を構成し、いくつかのオペレーターを呼び出すいくつかの Dag と subDag を作成しました。

私の問題は、オペレーターがジョブを実行して終了したときに、結果を何らかの Python 構造で受け取りたいということです。例えば:

File1.py

  ...
    ...
    sub_dag_one=SubDagOperator(subdag=subdag_accessHive(
PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, STEP, macros,path,
       ),
        task_id=DELP_DAG_NAME,
        dag=dag,
    )

File2.py

  from airflow import DAG
    from airflow.operators import HiveOperator
def subdag_callHive(parent, child, args, step,
                         user_defined_macros, path
                        ):
        dag_subdag = DAG(
            dag_id='%s.%s' % (parent, child),
            default_args=args,
            schedule_interval="@daily",
            template_searchpath=path,
            user_defined_macros=user_defined_macros,
        )

        # some work...

        HiveOperator(
            task_id='some_id',
            hiveconf_jinja_translate=True,
            hql='select field1 from public.mytable limit 4;',
            trigger_rule='all_done',
            dag=dag_subdag,
        )

        return dag_subdag 

関数subdag_callHiveは、メインの Dag が定義され、必要な他のすべてのパラメーターが定義されている別の Python スクリプトから呼び出されます。

この場合、4 つの値になる HiveOperator (*select * from public.mytable limit 4;*) から結果を取得できるようにしたいだけです。

返された dag_subdag はオブジェクト< class 'airflow.models.DAG' >であり、呼び出しに渡されたすべての属性/データが含まれていますが、HiveOperator が行ったことに関する情報は含まれていません。

これは可能ですか?もしそうなら、どうすればそれを達成できますか。

4

1 に答える 1

8

必要に応じてフックを使用できます。基本的に、HiveOperator は同じことを行い、結果を操作する複数のメソッドを持つ Hive フックを呼び出します。

PythonOperator を使用して、ハイブ フックを開始する関数を呼び出します。

次の例が役に立ちます。

コードスニペット:

callHook = PythonOperator(
    task_id='foo',
    python_callable=do_work,
    dag=dag
)

def do_work():
    hiveserver = HiveServer2Hook()
    hql = "SELECT COUNT(*) FROM foo.bar"
    row_count = hiveserver.get_records(hql, schema='foo')
    print row_count[0][0]

利用可能なすべてのメソッドは、https ://github.com/apache/incubator-airflow/blob/master/airflow/hooks/hive_hooks.py にあります。

于 2016-07-11T14:41:27.277 に答える