私が理解している場合、最初のステップはヒストグラムを計算することです。
[attr, value] => frequency
ここで、は列で発生したfrequency
回数です。value
attr
次のステップは、ヒストグラムテーブルと元のデータを取得し、各行についてAVFを計算し、それらを並べ替えることです。
2つのパスで実行します。1つはヒストグラムを計算するためのmap-reduceパス、もう1つはヒストグラムを使用してAVFを見つけるためのmrパスです。また、ヒストグラム値とセル値を同じ場所に取得するのは厄介な獣になるため、罪悪感のない単一の定数ハッシュを使用します。(たとえば、map1にキーとして発行させ、reduce1に各キーのすべてのレコードを蓄積させ、それらをカウントして発行させます。2[attr val id]
番目のパスはキーとして使用して再構成し、各行を平均します)。[attr val]
[id attr val count]
id
ヒストグラムを計算するには、中間のステップを「並べ替え」ではなく「グループ」と考えると役立ちます。方法は次のとおりです。reduce入力はキーで並べ替えられるため、指定されたキーのすべてのレコードを累積し、別のキーが表示されたらすぐにカウントを出力します。ルビーのダンボに相当するウーコンには、があり、ダンボにもあるAccumulator
と思います。(動作するコードについては、以下を参照してください)。
これはあなたに
attr1 val1a frequency
attr1 val1b frequency
attr2 val2a frequency
...
attrN attrNz frequency
次のパスでは、そのデータをハッシュテーブル(メモリに収まる場合は単純なHash
(dictionary
)、収まらない場合は高速のKey-Valueストア)にロードし、各レコードのAVFをそのまま計算します。
これは、avfを計算するためのルビーコードの動作です。http://github.com/mrflip/wukong/blob/master/examples/stats/avg_value_frequency.rbを参照してください
ファーストパス
module AverageValueFrequency
# Names for each column's attribute, in order
ATTR_NAMES = %w[length width height]
class HistogramMapper < Wukong::Streamer::RecordStreamer
def process id, *values
ATTR_NAMES.zip(values).each{|attr, val| yield [attr, val] }
end
end
#
# For an accumulator, you define a key that is used to group records
#
# The Accumulator calls #start! on the first record for that group,
# then calls #accumulate on all records (including the first).
# Finally, it calls #finalize to emit a result for the group.
#
class HistogramReducer < Wukong::Streamer::AccumulatingReducer
attr_accessor :count
# use the attr and val as the key
def get_key attr, val, *_
[attr, val]
end
# start the sum with 0 for each key
def start! *_
self.count = 0
end
# ... and count the number of records for this key
def accumulate *_
self.count += 1
end
# emit [attr, val, count]
def finalize
yield [key, count].flatten
end
end
end
Wukong::Script.new(AverageValueFrequency::HistogramMapper, AverageValueFrequency::HistogramReducer).run
2回目のパス
module AverageValueFrequency
class AvfRecordMapper < Wukong::Streamer::RecordStreamer
# average the frequency of each value
def process id, *values
sum = 0.0
ATTR_NAMES.zip(values).each do |attr, val|
sum += histogram[ [attr, val] ].to_i
end
avf = sum / ATTR_NAMES.length.to_f
yield [id, avf, *values]
end
# Load the histogram from a tab-separated file with
# attr val freq
def histogram
return @histogram if @histogram
@histogram = { }
File.open(options[:histogram_file]).each do |line|
attr, val, freq = line.chomp.split("\t")
@histogram[ [attr, val] ] = freq
end
@histogram
end
end
end