2

Python SDK で Google Cloud Dataflow を使用しています。

私はしたいと思います :

  • マスター PCollection から一意の日付のリストを取得する
  • そのリストの日付をループして、フィルター処理された PCollection (それぞれに一意の日付を持つ) を作成し、フィルター処理された各 PCollection を BigQuery の時分割テーブルのパーティションに書き込みます。

どうすればそのリストを入手できますか? 次の結合変換の後、ListPCollectionView オブジェクトを作成しましたが、そのオブジェクトを反復できません。

class ToUniqueList(beam.CombineFn):

    def create_accumulator(self):
        return []

    def add_input(self, accumulator, element):
        if element not in accumulator:
            accumulator.append(element)
        return accumulator

    def merge_accumulators(self, accumulators):
        return list(set(accumulators))

    def extract_output(self, accumulator):
        return accumulator


def get_list_of_dates(pcoll):

    return (pcoll
            | 'get the list of dates' >> beam.CombineGlobally(ToUniqueList()))

私はそれをすべて間違っていますか?それを行う最善の方法は何ですか?

ありがとう。

4

1 に答える 1

6

直接コンテンツを取得することはできません。Apache Beam または Dataflow パイプラインは、データを含むというよりは、プラン内の論理的な中間ノードであるためPCollection、何を処理する必要があるかを示すクエリ プランに似ています。PCollectionメイン プログラムは、計画 (パイプライン) を組み立てて開始します。

ただし、最終的には、日付で分割された BigQuery テーブルにデータを書き込もうとしています。このユースケースは現在、Java SDKとストリーミング パイプラインでのみサポートされています。

データに応じて複数の宛先にデータを書き込む、より一般的な処理については、BEAM-92に従ってください。

Google Cloud Dataflow を使用した、Parititoned BigQuery テーブルの作成/書き込みもご覧ください。

于 2017-01-03T20:56:07.157 に答える