入力ファイルは大きなgzip圧縮されたファイルであるため、分割しないようにカスタムファイルリーダーを作成しました。最初のマッパージョブで単純にそれらをgunzip圧縮する必要があります。「HadoopTheDefinitiveGuide」の例に従いましたが、BytesWritableに読み込もうとするとヒープエラーが発生します。これは、バイト配列のサイズが85713669であるためだと思いますが、この問題を解決する方法がわかりません。
コードは次のとおりです。
public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;
@Override
public void close() throws IOException {
// do nothing
}
@Override
public NullWritable getCurrentKey() throws IOException,
InterruptedException {
return NullWritable.get();
}
@Override
public BytesWritable getCurrentValue() throws IOException,
InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return processed ? 1.0f : 0.0f;
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!processed) {
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try {
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
}