4

MapReduceプログラムのReduceフェーズでは、以下のように、提供されたイテレーターの各値を連結するだけの操作を実行します。

public void reduce(Text key, Iterator<text> values,
                    OutputCollector<Text, Text> output, Reporter reporter) {
    Text next;
    Text outKey = new Text()
    Text outVal = new Text();
    StringBuilder sb = new StringBuilder();
    while(values.hasNext()) {
        next = values.next();
        sb.append(next.toString());
        if (values.hasNext())
            sb.append(',');
    }
    outKey.set(key.toString());
    outVal.set(sb.toSTring());
    output.collect(outKey,outVal);
}

私の問題は、reduce出力値の一部が巨大なテキスト行であるということです。非常に大きいため、初期サイズが非常に大きい場合でも、イテレータのすべてのコンテキストに対応するために、文字列バッファのサイズを数倍に増やす(2倍にする)必要があり、メモリの問題が発生します。

従来のJavaアプリケーションでは、これは、ファイルへのバッファ書き込みが出力の書き込みに適した方法であることを示しています。Hadoopで非常に大きな出力キーと値のペアをどのように処理しますか?結果をHDFS上のファイルに直接ストリーミングする必要がありますか(reduce呼び出しごとに1つのファイル)?output.collectメソッド以外に、出力をバッファリングする方法はありますか?

注:メモリ/ヒープサイズを可能な限り最大限に増やしました。また、いくつかの情報源は、レデューサーの数を増やすとメモリ/ヒープの問題に役立つ可能性があることを示していますが、ここでの問題は、容量を拡張している間のSringBuilderの使用に直接起因しています。

ありがとう

4

2 に答える 2

4

なぜあなたが巨大な価値を持ちたいのか理解できませんが、これを行う方法があります。

独自の OutputFormat を記述する場合はRecordWriter.write(Key, Value)、Key 値が null かどうかに基づいて値の連結を処理するようにメソッドの動作を修正できます。

このようにして、リデューサーで次のようにコードを記述できます (キーの最初の出力は実際のキーであり、その後はすべて null キーです。

public void reduce(Text key, Iterator<Text> values,
                OutputCollector<Text, Text> output, Reporter reporter) {
  boolean firstKey = true;
  for (Text value : values) {
    output.collect(firstKey ? key : null, value);
    firstKey = false;
  }
}

実際RecordWriter.write()には、null キー/値の連結ロジックを処理する次のロジックがあります。

    public synchronized void write(K key, V value) throws IOException {

        boolean nullKey = key == null || key instanceof NullWritable;
        boolean nullValue = value == null || value instanceof NullWritable;
        if (nullKey && nullValue) {
            return;
        }

        if (!nullKey) {
            // if we've written data before, append a new line
            if (dataWritten) {
                out.write(newline);
            }

            // write out the key and separator
            writeObject(key);
            out.write(keyValueSeparator);
        } else if (!nullValue) {
            // write out the value delimiter
            out.write(valueDelimiter);
        }

        // write out the value
        writeObject(value);

        // track that we've written some data
        dataWritten = true;
    }

    public synchronized void close(Reporter reporter) throws IOException {
        // if we've written out any data, append a closing newline
        if (dataWritten) {
            out.write(newline);
        }

        out.close();
    }

最後に書き出されたレコードに末尾の改行を書き込むように close メソッドも修正されていることに気付くでしょう。

完全なコード リストはpastebinにあります。テスト出力は次のとおりです。

key1    value1
key2    value1,value2,value3
key3    value1,value2
于 2012-04-14T02:21:50.670 に答える
2

単一の出力キー値がメモリよりも大きくなる可能性がある場合、標準出力メカニズムが適していないことを意味します。これは、インターフェイスの設計により、ストリームではなくキーと値のペアを渡す必要があるためです。
最も簡単な解決策は、出力を HDFS ファイルに直接ストリーミングすることだと思います。
出力形式でデータを渡す理由がある場合は、次の解決策をお勧めします。a) ローカルの一時ディレクトリに書き込む
b) ファイルの名前を出力形式の値として渡す。

おそらく最も効果的ですが、少し複雑な解決策は、メモリマップファイルをバッファとして使用することです。十分なメモリがある限りメモリ内に存在し、必要に応じて OS がディスクへの効率的なスピルを処理します。

于 2012-04-13T17:38:10.397 に答える