0

このサンプル レコード、100,1:2:3 があります。

100,1 100,2 100,3
として
正規化したい

私の同僚は、これを実現するために豚のスクリプトを書きましたが、私の MapReduce コードにはさらに時間がかかりました。以前はデフォルトの TextInputformat を使用していました。しかし、パフォーマンスを改善するために、カスタム RecordReader を使用して、カスタム入力形式クラスを作成することにしました。LineRecordReader クラスを参考に、以下のコードを書いてみました。

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;

import com.normalize.util.Splitter;

public class NormalRecordReader extends RecordReader<Text, Text> {

    private long start;
    private long pos;
    private long end;
    private LineReader in;
    private int maxLineLength;
    private Text key = null;
    private Text value = null;
    private Text line = null;

    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
        FileSplit split = (FileSplit) genericSplit;
        Configuration job = context.getConfiguration();
        this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);

        start = split.getStart();
        end = start + split.getLength();

        final Path file = split.getPath();

        FileSystem fs = file.getFileSystem(job);
        FSDataInputStream fileIn = fs.open(split.getPath());

        in = new LineReader(fileIn, job);
        this.pos = start;
    }

    public boolean nextKeyValue() throws IOException {
        int newSize = 0;
        if (line == null) {
            line = new Text();
        }

        while (pos < end) {
            newSize = in.readLine(line);
            if (newSize == 0) {
                break;
            }
            pos += newSize;
            if (newSize < maxLineLength) {
                break;
            }

            // line too long. try again
            System.out.println("Skipped line of size " + newSize + " at pos " + (pos - newSize));
        }
        Splitter splitter = new Splitter(line.toString(), ",");
        List<String> split = splitter.split();

        if (key == null) {
            key = new Text();
        }
        key.set(split.get(0));

        if (value == null) {
            value = new Text();
        }
        value.set(split.get(1));

        if (newSize == 0) {
            key = null;
            value = null;
            return false;

        } else {
            return true;
        }
    }

    @Override
    public Text getCurrentKey() {
        return key;
    }

    @Override
    public Text getCurrentValue() {
        return value;
    }

    /**
     * Get the progress within the split
     */
    public float getProgress() {
        if (start == end) {
            return 0.0f;
        } else {
            return Math.min(1.0f, (pos - start) / (float)(end - start));
        }
    }

    public synchronized void close() throws IOException {
        if (in != null) {
            in.close(); 
        }
    }
}

これは機能しますが、パフォーマンスの向上は見られません。ここでは、「,」で記録を破り、100 をキーとして、1,2,3 を値として設定しています。次のことを行うマッパーのみを呼び出します。

public void map(Text key, Text value, Context context) 
        throws IOException, InterruptedException {

    try {
        Splitter splitter = new Splitter(value.toString(), ":");
        List<String> splits = splitter.split();

        for (String split : splits) {
            context.write(key, new Text(split));
        }

    } catch (IndexOutOfBoundsException ibe) {
        System.err.println(value + " is malformed.");
    }
}

String のスプリッターが遅いことが分かったので、スプリッター クラスを使用してデータを分割します。メソッドは次のとおりです。

public List<String> split() {

    List<String> splitData = new ArrayList<String>();
    int beginIndex = 0, endIndex = 0;

    while(true) {

        endIndex = dataToSplit.indexOf(delim, beginIndex);
        if(endIndex == -1) {
            splitData.add(dataToSplit.substring(beginIndex));
            break;
        }

        splitData.add(dataToSplit.substring(beginIndex, endIndex));
        beginIndex = endIndex + delimLength;
    }

    return splitData;
}

コードを何らかの方法で改善できますか?

4

1 に答える 1

1

コメントの代わりに改善できると思うことをここに要約させてください。

  • 説明したように、現在、レコードごとに複数回オブジェクトを作成していTextます (回数はトークンの数と同じになります)。入力が小さい場合はそれほど問題にならないかもしれませんが、適度なサイズのジョブでは大きな問題になる可能性があります。これを修正するには、次の手順を実行します。

    private final Text text = new Text();
    
    public void map(Text key, Text value, Context context) {
        ....
        for (String split : splits) {
            text.set(split);
            context.write(key, text);
        }
    }
    
  • 分割のために、現在行っていることは、すべてのレコードに対して新しい配列を割り当て、この配列にデータを入力し、この配列を反復処理して出力を書き込むことです。この場合、状態を維持していないため、実際には配列は必要ありません。提供したメソッドの実装を使用するsplitと、データを 1 回渡すだけで済みます。

    public void map(Text key, Text value, Context context) {
        String dataToSplit = value.toString();
        String delim = ":";
    
        int beginIndex = 0;
        int endIndex = 0;
    
        while(true) {
            endIndex = dataToSplit.indexOf(delim, beginIndex);
            if(endIndex == -1) {
                text.set(dataToSplit.substring(beginIndex));
                context.write(key, text);
                break;
            }
    
            text.set(dataToSplit.substring(beginIndex, endIndex));
            context.write(key, text);
            beginIndex = endIndex + delim.length();
        }
    }
    
  • 独自の を作成する理由がよくわかりません。InputFormatそれKeyValueTextInputFormatはまさにあなたが必要としているものであり、おそらく既に最適化されているようです。使用方法は次のとおりです。

    conf.set("key.value.separator.in.input.line", ",");
    job.setInputFormatClass(KeyValueTextInputFormat.class);
    
  • あなたの例に基づいて、各レコードのキーは整数のようです。それが常に当てはまる場合、Textマッパーの入力キーとして a を使用することは最適ではなく、データの内容に応じて、IntWritableまたは場合によっては a である必要があります。ByteWritable

  • IntWritable同様に、 orByteWritableをマッパーの出力キーおよび出力値として使用したいとします。

また、意味のあるベンチマークが必要な場合は、可能であれば数 Gbs など、より大きなデータセットでテストする必要があります。特に分散システムのコンテキストでは、1 分間のテストはあまり意味がありません。1 つのジョブは、小さな入力では別のジョブよりも速く実行される可能性がありますが、大きな入力では傾向が逆転する可能性があります。

そうは言っても、Pig は Map/Reduce に変換するときに内部で多くの最適化を行っていることも知っておく必要があります。そのため、Pig が Java Map/Reduce コードよりも高速に実行されることにあまり驚かず、過去。私が提案した最適化を試してみてください。それでもまだ十分に速くない場合は、Map/Reduce ジョブのプロファイリングに関するいくつかの便利なトリックを使用したリンクを参照してください (特に、プロファイリングに関するヒント 7 は私が役立つと感じたものです)。

于 2013-01-20T08:03:52.120 に答える