2

巨大なテキスト ファイルがあり、各チャンクが 5 行になるようにファイルを分割したいと考えていました。独自の GWASInputFormat および GWASRecordReader クラスを実装しました。ただし、私の質問は、次のコード ( http://bigdatacircus.com/2012/08/01/wordcount-with-custom-record-reader-of-textinputformat/からコピーしたもの) の initialize() メソッド内にあります。次の行があります

FileSplit split = (FileSplit) genericSplit;
final Path file = split.getPath();
Configuration conf = context.getConfiguration();

私の質問は、私のGWASRecordReaderクラスでinitialize()メソッドが呼び出されるまでに、ファイルはすでに分割されていますか? 私はGWASRecordReaderクラスでそれ(分割)をしていると思いました。私の思考プロセスが正しいかどうか教えてください。

package com.test;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;

public class GWASRecordReader extends RecordReader<LongWritable, Text> {

private final int NLINESTOPROCESS = 5;
private LineReader in;
private LongWritable key;
private Text value = new Text();
private long start = 0;
private long pos = 0;
private long end = 0;
private int maxLineLength;

public void close() throws IOException {
    if(in != null) {
        in.close();
    }
}

public LongWritable getCurrentKey() throws IOException, InterruptedException {
    return key;
}

public Text getCurrentValue() throws IOException, InterruptedException {
    return value;
}

public float getProgress() throws IOException, InterruptedException {
    if(start == end) {
        return 0.0f;
    }
    else {
        return Math.min(1.0f, (pos - start)/(float) (end - start));
    }
}

public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit) genericSplit;
    final Path file = split.getPath();
    Configuration conf = context.getConfiguration();
    this.maxLineLength = conf.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE);
    FileSystem fs = file.getFileSystem(conf);
    start = split.getStart();
    end = start + split.getLength();
    System.out.println("---------------SPLIT LENGTH---------------------" + split.getLength());
    boolean skipFirstLine = false;
    FSDataInputStream filein = fs.open(split.getPath());

    if(start != 0) {
        skipFirstLine = true;
        --start;
        filein.seek(start);
    }

    in = new LineReader(filein, conf);
    if(skipFirstLine) {
        start += in.readLine(new Text(),0,(int)Math.min((long)Integer.MAX_VALUE, end - start));
    }
    this.pos = start;
}

public boolean nextKeyValue() throws IOException, InterruptedException {
    if (key == null) {
        key = new LongWritable();
    }

    key.set(pos);

    if (value == null) {
        value = new Text();
    }
    value.clear();
    final Text endline = new Text("\n");
    int newSize = 0;
    for(int i=0; i<NLINESTOPROCESS;i++) {
        Text v = new Text();
        while( pos < end) {
            newSize = in.readLine(v ,maxLineLength, Math.max((int)Math.min(Integer.MAX_VALUE, end - pos), maxLineLength));
            value.append(v.getBytes(), 0, v.getLength());
            value.append(endline.getBytes(),0,endline.getLength());
            if(newSize == 0) {
                break;
            }
            pos += newSize;
            if(newSize < maxLineLength) {
                break;
            }
        }
    }

    if(newSize == 0) {
        key = null;
        value = null;
        return false;
    } else {
        return true;
    }
}
}
4

1 に答える 1

7

はい、入力ファイルはすでに分割されています。基本的には次のようになります。

your input file(s) -> InputSplit -> RecordReader -> Mapper...

基本的にInputSplit、入力をチャンクに分割し、RecordReaderこれらのチャンクをキーと値のペアに分割します。InputSplitRecordReaderは、ご利用いただくことで決まりますのでご了承くださいInputFormat。たとえば、TextInputFormatを使用FileSplitして入力を分割LineRecordReaderし、位置をキーとして個々の行を処理し、行自体を値として処理します。したがって、あなたの中で、渡されているものを確認するために使用するGWASInputFormat種類を調べる必要があります。FileSplitGWASRecordReader

NLineInputFormat「N行の入力を1つの分割として分割する」ものを調べることをお勧めします。あなたが自分でやろうとしていることを正確に行うことができるかもしれません。

一度に 5 行を値として取得し、最初の行番号をキーとして取得しようとしている場合は、customizedNLineInputFormatと customを使用してこれを行うことができると思いますLineRecordReader。入力形式は入力を 5 行のチャンクに分割できるため、入力の分割についてそれほど心配する必要はないと思います。RecordReaderと非常に似ていますが、チャンクLineRecordReaderの開始のバイト位置を取得する代わりに、行番号を取得します。そのため、小さな変更を除いて、コードはほとんど同じです。したがって、基本的にコピーして貼り付けることができますNLineInputFormatLineRecordReader、入力形式で行番号を取得するレコードリーダーを使用できます。コードは非常に似ています。

于 2012-11-12T17:52:28.173 に答える