Python SDK で Google Cloud Dataflow を使用しています。
私はしたいと思います :
- マスター PCollection から一意の日付のリストを取得する
- そのリストの日付をループして、フィルター処理された PCollection (それぞれに一意の日付を持つ) を作成し、フィルター処理された各 PCollection を BigQuery の時分割テーブルのパーティションに書き込みます。
どうすればそのリストを入手できますか? 次の結合変換の後、ListPCollectionView オブジェクトを作成しましたが、そのオブジェクトを反復できません。
class ToUniqueList(beam.CombineFn):
def create_accumulator(self):
return []
def add_input(self, accumulator, element):
if element not in accumulator:
accumulator.append(element)
return accumulator
def merge_accumulators(self, accumulators):
return list(set(accumulators))
def extract_output(self, accumulator):
return accumulator
def get_list_of_dates(pcoll):
return (pcoll
| 'get the list of dates' >> beam.CombineGlobally(ToUniqueList()))
私はそれをすべて間違っていますか?それを行う最善の方法は何ですか?
ありがとう。