7

CSV から宛先 CSV へのソースを持つ Kiba Etl スクリプトを作成したい

Kiba ETL スクリプト ファイル

source CsvSource, 'users.csv', col_sep: ';', headers: true, header_converters: :symbol

transform VerifyFieldsPresence, [:name, :euro]

transform AggregateFields, { sum: :euro, group_by: :name}

transform RenameField,from: :euro, to: :total_amount

destination CsvDestination, 'result.csv', [:name, :total_amount]

ユーザー.csv

date;euro;name
7/3/2015;10;Jack
7/3/2015;85;Jill
8/3/2015;6;Jack
8/3/2015;12;Jill
9/3/2015;99;Mack

result.csv (期待される結果)

total_amount;name
16;Jack
97;Jill
99;Mack

etl トランスフォーマーは一度に 1 つの行で次々に実行されますが、2 番目のトランスフォーマーの動作は、transform メソッドに渡されるクラスでアクセスできない行のコレクション全体に依存します。

transform AggregateFields, { sum: :euro, group_by: :name }

kiba gemを使用してこの動作を実現できるものはありますか
? よろしくお願いします

4

1 に答える 1

9

編集: 2020 年です。Kiba ETL v3 には、これを行うためのはるかに優れた方法が含まれています。すべての関連情報については、この記事https://thibautbarrere.com/2020/03/05/new-in-kiba-etl-v3をご覧ください。

木場作者はこちら!主にデータサイズと実際のニーズに応じて、さまざまな方法でこれを実現できます。ここにいくつかの可能性があります。

Kiba スクリプトで変数を使用して集計する

require 'awesome_print'

transform do |r|
  r[:amount] = BigDecimal.new(r[:amount])
  r
end

total_amounts = Hash.new(0)

transform do |r|
  total_amounts[r[:name]] += r[:amount]
  r
end

post_process do
  # pretty print here, but you could save to a CSV too
  ap total_amounts
end

これは最も簡単な方法ですが、非常に柔軟です。

ただし、集計はメモリに保持されるため、シナリオによっては、これで十分かどうかは異なります。現在、Kiba はモノスレッド (ただし、"Kiba Pro" はマルチスレッドになります) であるため、ロックを追加したり、集計にスレッドセーフな構造を使用したりする必要はありません。

post_process ブロックから TextQL を呼び出す

集計するもう 1 つの迅速かつ簡単な方法は、最初に集計されていない CSV ファイルを生成し、次にTextQlを利用して実際に集計を行うことです。

destination CsvSource, 'non-aggregated-output.csv', [:name, :amount]

post_process do
  query = <<SQL
    select
      name,
      /* apparently sqlite has reduced precision, round to 2 for now */
      round(sum(amount), 2) as total_amount
    from tbl group by name
SQL

  textql('non-aggregated-output.csv', query, 'aggregated-output.csv')
end

次のヘルパーが定義されています。

def system!(cmd)
  raise "Failed to run command #{command}" unless system(command)
end

def textql(source_file, query, output_file)
  system! "cat #{source_file} | textql -header -output-header=true -sql \"#{query}\" > #{output_file}"
  # this one uses csvfix to pretty print the table
  system! "cat #{output_file} | csvfix ascii_table"
end

ただし、計算を行うときは精度に注意してください。

インメモリ集計先の記述

ここで機能する便利なトリックは、特定の宛先をクラスでラップして集計を行うことです。これは次のようになります。

class InMemoryAggregate
  def initialize(sum:, group_by:, destination:)
    @aggregate = Hash.new(0)
    @sum = sum
    @group_by = group_by
    # this relies a bit on the internals of Kiba, but not too much
    @destination = destination.shift.new(*destination)
  end

  def write(row)
    # do not write, but count here instead
    @aggregate[row[@group_by]] += row[@sum]
  end

  def close
    # use close to actually do the writing
    @aggregate.each do |k,v|
      # reformat BigDecimal additions here
      value = '%0.2f' % v
      @destination.write(@group_by => k, @sum => value)
    end
    @destination.close
  end
end

この方法で使用できます:

# convert your string into an actual number
transform do |r|
  r[:amount] = BigDecimal.new(r[:amount])
  r
end

destination CsvDestination, 'non-aggregated.csv', [:name, :amount]

destination InMemoryAggregate,
  sum: :amount, group_by: :name,
  destination: [
    CsvDestination, 'aggregated.csv', [:name, :amount]
  ]

post_process do
  system!("cat aggregated.csv | csvfix ascii_table")
end

このバージョンの優れた点は、アグリゲーターをさまざまな宛先 (データベースなど) で再利用できることです。

ただし、これにより、最初のバージョンと同様に、すべての集計がメモリに保持されることに注意してください。

集約機能を備えたストアへの挿入

もう 1 つの方法 (特にボリュームが非常に大きい場合に役立ちます) は、結果のデータをデータを集約できるものに送信することです。これは、通常の SQL データベース、Redis、または必要に応じてクエリを実行できる、より手の込んだものである可能性があります。

私が言ったように、実装は実際のニーズに大きく依存します。ここであなたに合ったものが見つかることを願っています!

于 2015-06-30T23:01:15.143 に答える