9

私は標準的なMapReduceの例に似たものに取り組んでいます-単語数ですが、トップNの結果のみを取得しようとしているという点でひねりを加えています。

HDFSに非常に大量のテキストデータがあるとしましょう。そのテキスト内のすべての単語の単語数を提供するHadoopMapReduceジョブを構築する方法を示す例はたくさんあります。たとえば、私のコーパスが次の場合:

「これはテストデータのテストであり、これをテストするのに適しています」

標準のMapReduce単語カウントジョブの結果セットは次のようになります。

test:3、a:2、this:2、is:1など。

しかし、データセット全体で使用された上位3つの単語のみを取得したい場合はどうなりますか?

まったく同じ標準のMapReduce単語数カウントジョブを実行し、準備ができてすべての単語の数を吐き出したら、上位3つの結果を取得することはできますが、大量のデータが必要なため、少し非効率的です。シャッフルフェーズ中に移動します。

私が考えているのは、このサンプルが十分に大きく、データがHDFSでランダムに分散されている場合、各マッパーはすべての単語数をレデューサーに送信する必要はなく、一部のデータのみを送信する必要があるということです。トップデータ。したがって、1人のマッパーがこれを持っている場合:

a:8234、:5422、男性:4352、...... さらに多くの単語...、レアワード:1、奇妙な単語:1など。

次に、私がやりたいのは、各マッパーからレデューサーフェーズにトップ100程度の単語を送信することだけです。これは、すべてが完了したときに「レアワード」が突然トップ3に入る可能性がほとんどないためです。これにより、帯域幅とレデューサーの処理時間が節約されるようです。

これはコンバイナーフェーズで実行できますか?シャッフルフェーズの前にこの種の最適化が一般的に行われますか?

4

2 に答える 2

7

これは非常に良い質問です。Hadoop の単語カウントの例の非効率性に気付いたからです。

問題を最適化するための秘訣は次のとおりです。

HashMapローカル マップ ステージでベースのグループ化を行います。そのためにコンバイナーを使用することもできます。これは次のようになります。私はHashMultiSetGuava を使用しています。これにより、優れたカウント メカニズムが促進されます。

    public static class WordFrequencyMapper extends
      Mapper<LongWritable, Text, Text, LongWritable> {

    private final HashMultiset<String> wordCountSet = HashMultiset.create();

    @Override
    protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {

      String[] tokens = value.toString().split("\\s+");
      for (String token : tokens) {
        wordCountSet.add(token);
      }
    }

そして、クリーンアップ ステージで結果を出力します。

@Override
protected void cleanup(Context context) throws IOException,
    InterruptedException {
  Text key = new Text();
  LongWritable value = new LongWritable();
  for (Entry<String> entry : wordCountSet.entrySet()) {
    key.set(entry.getElement());
    value.set(entry.getCount());
    context.write(key, value);
  }
}

つまり、作業のローカル ブロックに単語をグループ化して、RAM を少し使用することでネットワークの使用量を削減しました。で同じことを行うこともできますが、Combinerグループに並べ替えているため、HashMultiset.

トップ N を取得するには、そのローカルのトップ N をHashMultiset出力コレクターに書き込み、reduce 側で通常の方法で結果を集計するだけです。これにより、ネットワーク帯域幅も大幅に節約できます。唯一の欠点は、クリーンアップ メソッドでワード カウント タプルをソートする必要があることです。

コードの一部は次のようになります。

  Set<String> elementSet = wordCountSet.elementSet();
  String[] array = elementSet.toArray(new String[elementSet.size()]);
  Arrays.sort(array, new Comparator<String>() {

    @Override
    public int compare(String o1, String o2) {
      // sort descending
      return Long.compare(wordCountSet.count(o2), wordCountSet.count(o1));
    }

  });
  Text key = new Text();
  LongWritable value = new LongWritable();
  // just emit the first n records
  for(int i = 0; i < N, i++){
    key.set(array[i]);
    value.set(wordCountSet.count(array[i]));
    context.write(key, value);
  }

できるだけ多くの単語をローカルで実行する要点を理解してから、上位 N の上位 N を集計してください ;)

于 2012-11-28T16:58:53.890 に答える