10

csv ファイルを読み取り、Apache Beam データフローを使用して BigQuery に書き込みたいと考えています。これを行うには、データを辞書の形式で BigQuery に提示する必要があります。これを行うためにApacheビームを使用してデータを変換するにはどうすればよいですか?

私の入力 csv ファイルには 2 つの列があり、BigQuery で後続の 2 列のテーブルを作成したいと考えています。私は BigQuery でデータを作成する方法を知っています。それは簡単です。私が知らないのは、csv を辞書に変換する方法です。以下のコードは正しくありませんが、私が何をしようとしているのかがわかります。

# Standard imports
import apache_beam as beam
# Create a pipeline executing on a direct runner (local, non-cloud).
p = beam.Pipeline('DirectPipelineRunner')
# Create a PCollection with names and write it to a file.
(p
| 'read solar data' >> beam.Read(beam.io.TextFileSource('./sensor1_121116.csv'))
# How do you do this??
| 'convert to dictionary' >> beam.Map(lambda (k, v): {'luminosity': k, 'datetime': v})
| 'save' >> beam.Write(
   beam.io.BigQuerySink(
   output_table,
   schema='month:INTEGER, tornado_count:INTEGER',
   create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
   write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run()
4

2 に答える 2

27

編集: バージョン 2.12.0 の時点で、Beam にはfileio、ソースを再実装することなく CSV から読み取ることができる新しい変換が付属しています。これは次のように行うことができます。

def get_csv_reader(readable_file):
  # You can return whichever kind of reader you want here
  # a DictReader, or a normal csv.reader.
  if sys.version_info >= (3, 0):
    return csv.reader(io.TextIOWrapper(readable_file.open()))
  else:
    return csv.reader(readable_file.open())

with Pipeline(...) as p:
  content_pc = (p
                | beam.io.fileio.MatchFiles("/my/file/name")
                | beam.io.fileio.ReadMatches()
                | beam.Reshuffle()  # Useful if you expect many matches
                | beam.FlatMap(get_csv_reader))

私は最近、Apache Beam のテストを作成しました。Github リポジトリをご覧ください。


古い答えは、ソースの再実装に依存していました。これは、これを行うための主な推奨方法ではなくなりました:)

アイデアは、解析された CSV 行を返すソースを持つことです。これを行うには、クラスをサブクラス化してFileBasedSourceCSV 解析を含めます。特に、read_records関数は次のようになります。

class MyCsvFileSource(apache_beam.io.filebasedsource.FileBasedSource):
  def read_records(self, file_name, range_tracker):
    self._file = self.open_file(file_name)

    reader = csv.reader(self._file)

    for rec in reader:
      yield rec
于 2016-12-15T19:25:32.253 に答える