多くのファイルがあり、そのうちのいくつかは非常に小さいです。マッパーの数を減らすために、CombineFileInputFormat を使用したいと思います。ファイル名は、マッパー出力のキーの一部として使用されます。
CombineFileSplit の各チャンクのファイル名を取得するために、次のようないくつかの方法を試しましたが、すべて失敗しました。
1)conf.set("map.input.file", split.getPath(idx).toString());
関数で見る
initNextRecordReader()
クラスのCombineFileRecordReader
。しかしNullPointerException
map()
として、私の機能で起こりますcontext.getConfiguration().get("map.input.file")
戻りますnull
。
2)((FileSplit) (context.getInputSplit())).getPath().getName()
マッパーでも試してみましたが、java.lang.ClassCastException: org.apache.hadoop.mapreduce.lib.input.CombineFileSplit cannot be cast to org.apache.hadoop.mapreduce.lib.input.FileSplit
起こります。
では、 CombineFileSplit で各ファイル名を取得するにはどうすればよいですか?
================================================== ==========
入力ファイルは lzo 圧縮されており、現時点ではインデックス化されていません。
以下は私のコードです:
私は次のように CombineFileInputFormat を実装します。
public class CombinedInputFormat extends CombineFileInputFormat<LongWritable, Text> {
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit arg0,
TaskAttemptContext arg1) throws IOException {
// TODO Auto-generated method stub
return new CombineFileRecordReader<LongWritable, Text>((CombineFileSplit) arg0, arg1, CombineLzoLineRecordReader.class);
}
}
これは、LzoLineRecordReader を拡張した CombineLzoLineRecordReader です。
public class CombineLzoLineRecordReader extends LzoLineRecordReader {
private int index;
public CombineLzoLineRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index)
throws IOException, InterruptedException {
this.index = index;
}
public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException {
CombineFileSplit combineSplit = (CombineFileSplit) genericSplit;
FileSplit fileSplit = new FileSplit(combineSplit.getPath(index), combineSplit.getOffset(index), combineSplit.getLength(index), combineSplit.getLocations());
super.initialize(fileSplit, context);
}
}
そして、私のマップメソッドは次のようになります:
private String getName(String filePath) {
String[] filePathDir = filePath.split("/");
return filePathDir[filePathDir.length - 1];
}
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String name = getName(context.getConfiguration().get("map.input.file"));
line = new String(value.getBytes(), 0, value.getLength(), "ISO-8859-1");
lineFields = line.split("\t",-1);
if (lineFields != null && lineFields.length >= 20) {
// do something ...
}
}
そしてエラー情報:
13/06/14 17:02:50 INFO mapred.JobClient: Task Id : attempt_201209101415_762760_m_000000_0, Status : FAILED
java.lang.NullPointerException
at com.netease.devilfish.hadoop.job.LogAnalysisDailyMapper.getName(Unknown Source)