0

私のマッパーの 1 つは、part-0、part-1、part-2 などのファイルに分散されたいくつかのログを生成します。これらのそれぞれには、いくつかのクエリと、そのクエリに関連するいくつかのデータがあります。

part-0

q             score         
1 ben 10      4.01
horse shoe    5.96
...

part-1

1 ben 10        3.23
horse shoe      2.98
....

and so on for part-2,3 etc.

ここで、上記の「1 ben 10」という同じクエリ q が、part-1、part-2 などに存在します。

ここで、同じクエリを収集し、それらのスコアを集計 (合計) できる map reduce フェーズを作成する必要があります。

私のマッパー関数は ID にすることができ、reduce でこのタスクを達成します。

出力は次のようになります。

q       aggScore
1 ben 10    7.24
horse shoe  8.96
...

簡単な作業のように思えますが、これをどのように進めることができるかを考えることができません (たくさん読んでも、実際には進めることができません)。一般的なアルゴリズムの問​​題に関して考えることができます。最初に一般的なクエリを収集し、それらのスコアを合計します。

pythonic ソリューションまたはアルゴリズム (map reduce) のヒントがあれば、本当に感謝しています。

4

1 に答える 1

1

MapReduce ソリューションは次のとおりです。

マップ入力: 各入力ファイル (part-0、part-1、part-2、...) は、個別の (別個の) マップ タスクに入力できます。

入力ファイルの foreach 入力行で、Mapper は<q,aggScore>. 1 つのファイル内のクエリに複数のスコアがある場合、Map はそれらをすべて合計します。それ以外の場合、各クエリが各ファイルに 1 回だけ表示されることがわかっている場合、map は<q,aggScore>入力行ごとにそのまま発行される恒等関数になります。

Reducer の入力は次の形式です<q,list<aggScore1,aggScore2,...>。 Reducer 操作は、よく知られている MapReduce の例に似ていますwordcount。Hadoop を使用している場合は、Reducer に対して次の方法を使用できます。

public void reduce(Text q, Iterable<IntWritable> aggScore, Context context) throws IOException, InterruptedException {
   int sum = 0;
   for (IntWritable val : aggScore) {
      sum += val.get();
   }
   context.write(q, new IntWritable(sum));
}

aggScoresこのメソッドは、特定のすべてを合計しq、目的の出力を提供します。レデューサーの python コードは次のようになります (ここqにキーと値のリストaggScoresがあります)。

def reduce(self, key, values, output, reporter):
    sum = 0
    while values.hasNext():
        sum += values.next().get()
    output.collect(key, IntWritable(sum))
于 2013-02-26T07:54:34.900 に答える