1

完全なファイル データを値とキーのファイル名として持つという特別な要件があるため、Hadoop でテキスト ファイルと gzip ファイルの両方を読み取るカスタム レコード リーダーを作成しました。ソースは次のとおりです。

public class WholeFileRecordReader extends RecordReader<Text, BytesWritable> {

    private CompressionCodecFactory compressionCodecs = null;
    private FileSplit fileSplit;
    private Configuration conf;
    private InputStream in;
    private Text key = new Text("");
    private BytesWritable value = new BytesWritable();
    private boolean processed = false;

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {

        this.fileSplit = (FileSplit) split;
        this.conf = context.getConfiguration();

        final Path file = fileSplit.getPath();
        compressionCodecs = new CompressionCodecFactory(conf);

        final CompressionCodec codec = compressionCodecs.getCodec(file);
        System.out.println(codec);
        FileSystem fs = file.getFileSystem(conf);
        in = fs.open(file);

        if (codec != null) {
            in = codec.createInputStream(in);
        }
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!processed) {
            byte[] contents = new byte[(int) fileSplit.getLength()];
            Path file = fileSplit.getPath();
            key.set(file.getName());

            try {
                IOUtils.readFully(in, contents, 0, contents.length);
                value.set(contents, 0, contents.length);
            } finally {
                IOUtils.closeStream(in);
            }

            processed = true;
            return true;
        }

        return false;
    }

    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException {
        return processed ? 1.0f : 0.0f;
    }

    @Override
    public void close() throws IOException {
        // Do nothing
    }

}

問題は、コードが不完全なファイル データを読み取っていることです。これはおそらく、fileSplit (圧縮ファイルを指す) を使用してコンテンツの長さを決定しているため、値が小さくなっていることが原因です。したがって、これにより不完全なデータが Mapper に渡されます。

gzip ファイル データの実際の長さを取得する方法や、完全なデータを読み取るように RecordReader を変更する方法を教えてください。

4

2 に答える 2

1

@Chris Whiteの回答を拡張して、彼が提供したコードに特定の構文変更を加える必要がありました。それは次のとおりです。

fileLength = (int) fileSplit.getLength();
compressionCodecs = new CompressionCodecFactory(conf);

final CompressionCodec codec = compressionCodecs.getCodec(file);
FileSystem fs = file.getFileSystem(conf);
in = fs.open(file);

if (codec != null) {
    if (codec instanceof GzipCodec) {
        byte[] len = new byte[4];
        try {
            in.skip(fileLength - 4);
            IOUtils.readFully(in, len, 0, len.length);
            fileLength = (len[3] << 24) | (len[2] << 16) + (len[1] << 8) + len[0];
        } finally {
            in.close();
        }
    }

    in = fs.open(file);
    in = codec.createInputStream(in);
}

@Chris White さん、ご意見ありがとうございます。あなたなしではできなかったでしょう:)

于 2013-01-29T06:57:02.450 に答える
0

GZip ファイルの場合、最後の 4 バイトまでスキップできます (仕様によれば、元の圧縮されていないファイル サイズを返す必要があります)。値は 2^32 のモジュロであることに注意してください。元のファイルがこれよりも大きくなると予想される場合は注意してください。

したがって、初期化メソッドは次のようなものに修正できます (テストされていません!):

final CompressionCodec codec = compressionCodecs.getCodec(file);
System.out.println(codec);
FileSystem fs = file.getFileSystem(conf);
in = fs.open(file);

length = fileSplit.getLength();
if (codec instanceof GZipCodec) {
  // skip to last 4 bytes
  in.seek(length-4);

  // read size
  length = in.readInt();

  // reset stream position
  in.seek(0);
}

これで、nextKeyValue() メソッドで使用できる実際のファイル長 (非圧縮および Gzip 圧縮の場合) が得られます。

于 2013-01-25T12:07:42.630 に答える