6

段落全体を1行ずつではなくマッパーに送信するために、RecordReaderクラスのメソッド「next」とTextInputFormatクラスの「getRecordReader」をオーバーライドしています。(私は古いAPIを使用しており、段落の定義は、テキストファイルに空白行が入るまで追加されます。)
以下は私のコードです。

public class NLinesInputFormat extends TextInputFormat  
{  
   @Override
   public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter)throws IOException     {   
        reporter.setStatus(split.toString());  
        return new ParagraphRecordReader(conf, (FileSplit)split);
    }
}



public class ParagraphRecordReader implements RecordReader<LongWritable, Text> 
{
        private LineRecordReader lineRecord;
        private LongWritable lineKey;
        private Text lineValue;
        public ParagraphRecordReader(JobConf conf, FileSplit split) throws IOException {
            lineRecord = new LineRecordReader(conf, split);
            lineKey = lineRecord.createKey();
            lineValue = lineRecord.createValue();
        }

        @Override
        public void close() throws IOException {
            lineRecord.close();
        }

        @Override
        public LongWritable createKey() {
            return new LongWritable();

        }

        @Override
        public Text createValue() {
            return new Text("");

        }

        @Override
        public float getProgress() throws IOException {
            return lineRecord.getPos();

        }

        @Override
        public synchronized boolean next(LongWritable key, Text value) throws IOException {
            boolean appended, gotsomething;
            boolean retval;
            byte space[] = {' '};
            value.clear();
            gotsomething = false;
            do {
                appended = false;
                retval = lineRecord.next(lineKey, lineValue);
                if (retval) {
                    if (lineValue.toString().length() > 0) {
                        byte[] rawline = lineValue.getBytes();
                        int rawlinelen = lineValue.getLength();
                        value.append(rawline, 0, rawlinelen);
                        value.append(space, 0, 1);
                        appended = true;
                    }
                    gotsomething = true;
                }
            } while (appended);

            //System.out.println("ParagraphRecordReader::next() returns "+gotsomething+" after setting value to: ["+value.toString()+"]");
            return gotsomething;
        }

        @Override
        public long getPos() throws IOException {
            return lineRecord.getPos();
        }
    }  

質問:
1。これを行う方法について具体的なガイドが見つからなかったので、間違っていることがあるかもしれません。何か提案をコメントしてください。
2.これを正しくコンパイルできますが、ジョブを実行するとマッパーが継続的に実行され、問題がどこにあるのかわかりません。

4

1 に答える 1

3

あなたのコードは私にとって完全にうまく機能します。私が行った唯一の変更は、これらのクラスを内部クラスとして持ち、静的にすることでした。

入力ファイルは次のとおりです。

This is awesome.
WTF is this.

This is just a test.

マッパーコードは次のようになりました。

@Override
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
    throws IOException {

    System.out.println(key+" : "+value);
}

そして、出力は次のとおりです。

0 : This is awesome. WTF is this. 
0 : This is just a test.

入力フォーマットの設定を忘れていないことは間違いありませんが、念のため、次のように設定してください。

conf.setInputFormat(NLinesInputFormat.class);
于 2013-03-25T09:18:42.370 に答える