編集: 2020 年です。Kiba ETL v3 には、これを行うためのはるかに優れた方法が含まれています。すべての関連情報については、この記事https://thibautbarrere.com/2020/03/05/new-in-kiba-etl-v3をご覧ください。
木場作者はこちら!主にデータサイズと実際のニーズに応じて、さまざまな方法でこれを実現できます。ここにいくつかの可能性があります。
Kiba スクリプトで変数を使用して集計する
require 'awesome_print'
transform do |r|
r[:amount] = BigDecimal.new(r[:amount])
r
end
total_amounts = Hash.new(0)
transform do |r|
total_amounts[r[:name]] += r[:amount]
r
end
post_process do
# pretty print here, but you could save to a CSV too
ap total_amounts
end
これは最も簡単な方法ですが、非常に柔軟です。
ただし、集計はメモリに保持されるため、シナリオによっては、これで十分かどうかは異なります。現在、Kiba はモノスレッド (ただし、"Kiba Pro" はマルチスレッドになります) であるため、ロックを追加したり、集計にスレッドセーフな構造を使用したりする必要はありません。
post_process ブロックから TextQL を呼び出す
集計するもう 1 つの迅速かつ簡単な方法は、最初に集計されていない CSV ファイルを生成し、次にTextQlを利用して実際に集計を行うことです。
destination CsvSource, 'non-aggregated-output.csv', [:name, :amount]
post_process do
query = <<SQL
select
name,
/* apparently sqlite has reduced precision, round to 2 for now */
round(sum(amount), 2) as total_amount
from tbl group by name
SQL
textql('non-aggregated-output.csv', query, 'aggregated-output.csv')
end
次のヘルパーが定義されています。
def system!(cmd)
raise "Failed to run command #{command}" unless system(command)
end
def textql(source_file, query, output_file)
system! "cat #{source_file} | textql -header -output-header=true -sql \"#{query}\" > #{output_file}"
# this one uses csvfix to pretty print the table
system! "cat #{output_file} | csvfix ascii_table"
end
ただし、計算を行うときは精度に注意してください。
インメモリ集計先の記述
ここで機能する便利なトリックは、特定の宛先をクラスでラップして集計を行うことです。これは次のようになります。
class InMemoryAggregate
def initialize(sum:, group_by:, destination:)
@aggregate = Hash.new(0)
@sum = sum
@group_by = group_by
# this relies a bit on the internals of Kiba, but not too much
@destination = destination.shift.new(*destination)
end
def write(row)
# do not write, but count here instead
@aggregate[row[@group_by]] += row[@sum]
end
def close
# use close to actually do the writing
@aggregate.each do |k,v|
# reformat BigDecimal additions here
value = '%0.2f' % v
@destination.write(@group_by => k, @sum => value)
end
@destination.close
end
end
この方法で使用できます:
# convert your string into an actual number
transform do |r|
r[:amount] = BigDecimal.new(r[:amount])
r
end
destination CsvDestination, 'non-aggregated.csv', [:name, :amount]
destination InMemoryAggregate,
sum: :amount, group_by: :name,
destination: [
CsvDestination, 'aggregated.csv', [:name, :amount]
]
post_process do
system!("cat aggregated.csv | csvfix ascii_table")
end
このバージョンの優れた点は、アグリゲーターをさまざまな宛先 (データベースなど) で再利用できることです。
ただし、これにより、最初のバージョンと同様に、すべての集計がメモリに保持されることに注意してください。
集約機能を備えたストアへの挿入
もう 1 つの方法 (特にボリュームが非常に大きい場合に役立ちます) は、結果のデータをデータを集約できるものに送信することです。これは、通常の SQL データベース、Redis、または必要に応じてクエリを実行できる、より手の込んだものである可能性があります。
私が言ったように、実装は実際のニーズに大きく依存します。ここであなたに合ったものが見つかることを願っています!