段落全体を1行ずつではなくマッパーに送信するために、RecordReaderクラスのメソッド「next」とTextInputFormatクラスの「getRecordReader」をオーバーライドしています。(私は古いAPIを使用しており、段落の定義は、テキストファイルに空白行が入るまで追加されます。)
以下は私のコードです。
public class NLinesInputFormat extends TextInputFormat
{
@Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter)throws IOException {
reporter.setStatus(split.toString());
return new ParagraphRecordReader(conf, (FileSplit)split);
}
}
public class ParagraphRecordReader implements RecordReader<LongWritable, Text>
{
private LineRecordReader lineRecord;
private LongWritable lineKey;
private Text lineValue;
public ParagraphRecordReader(JobConf conf, FileSplit split) throws IOException {
lineRecord = new LineRecordReader(conf, split);
lineKey = lineRecord.createKey();
lineValue = lineRecord.createValue();
}
@Override
public void close() throws IOException {
lineRecord.close();
}
@Override
public LongWritable createKey() {
return new LongWritable();
}
@Override
public Text createValue() {
return new Text("");
}
@Override
public float getProgress() throws IOException {
return lineRecord.getPos();
}
@Override
public synchronized boolean next(LongWritable key, Text value) throws IOException {
boolean appended, gotsomething;
boolean retval;
byte space[] = {' '};
value.clear();
gotsomething = false;
do {
appended = false;
retval = lineRecord.next(lineKey, lineValue);
if (retval) {
if (lineValue.toString().length() > 0) {
byte[] rawline = lineValue.getBytes();
int rawlinelen = lineValue.getLength();
value.append(rawline, 0, rawlinelen);
value.append(space, 0, 1);
appended = true;
}
gotsomething = true;
}
} while (appended);
//System.out.println("ParagraphRecordReader::next() returns "+gotsomething+" after setting value to: ["+value.toString()+"]");
return gotsomething;
}
@Override
public long getPos() throws IOException {
return lineRecord.getPos();
}
}
質問:
1。これを行う方法について具体的なガイドが見つからなかったので、間違っていることがあるかもしれません。何か提案をコメントしてください。
2.これを正しくコンパイルできますが、ジョブを実行するとマッパーが継続的に実行され、問題がどこにあるのかわかりません。