2

Hadoop 0.20.0 / 0.20.2 の CombineFileInputFormat を使用して、レコードごとに 1 ファイルを処理し、データのローカリティ (通常は処理する) を妥協しないようにしたいと考えています。

Tom White の Hadoop Definitive Guide で言及されていますが、彼はその方法を示していません。代わりに、彼はシーケンス ファイルに進みます。

レコードリーダーで処理された変数の意味について、私はかなり混乱しています。どのコード例も非常に役立ちます。

前もって感謝します..

4

2 に答える 2

1

結合されたファイルの入力形式に使用される以下の入力形式を確認してください。

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;


/**
 * CustomInputformat which implements the createRecordReader of abstract class CombineFileInputFormat
 */

public class MyCombineFileInputFormat extends CombineFileInputFormat {

    public static class MyRecordReader extends RecordReader<LongWritable,Text>{
        private LineRecordReader delegate=null;
        private int idx;

        public MyRecordReader(CombineFileSplit split,TaskAttemptContext taskcontext ,Integer idx) throws IOException {
            this.idx=idx;
            delegate = new LineRecordReader();
        }

        @Override
        public void close() throws IOException {
            delegate.close();
        }

        @Override
        public float getProgress() {
            try {
                return delegate.getProgress();
            }
            catch(Exception e) {
                return 0;
            }
        }

        @Override
        public void initialize(InputSplit split, TaskAttemptContext taskcontext) throws IOException {
            CombineFileSplit csplit=(CombineFileSplit)split;
            FileSplit fileSplit = new FileSplit(csplit.getPath(idx), csplit.getOffset(idx), csplit.getLength(idx), csplit.getLocations());
            delegate.initialize(fileSplit, taskcontext);
        }

        @Override
        public LongWritable getCurrentKey() throws IOException,
                InterruptedException {
            return delegate.getCurrentKey();
        }


        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
            return delegate.getCurrentValue();
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            return delegate.nextKeyValue();
        }

    }

    @SuppressWarnings("unchecked")
    @Override
    public RecordReader createRecordReader(InputSplit split,TaskAttemptContext taskcontext) throws IOException {
        return new CombineFileRecordReader((CombineFileSplit) split, taskcontext, MyRecordReader.class);
    }
}
于 2013-09-30T09:43:44.007 に答える
0

いわゆる「新しい API」から CombineFileInputFormat を使用する最も簡単な方法を次に示します。実際の入力形式がMyFormatで、 MyKeyのキーとMyValueの値で動作するとします (SequenceFileInputFormat< MyKey, MyValue >たとえば、 のサブクラスである可能性があります)。

public class CombinedMyFormat extends CombineFileInputFormat< MyKey, MyValue > {
    // exists merely to fix the key/value types and
    // inject the delegate format to the superclass
    // if MyFormat does not use state, consider a constant instead
    private static class CombineMyKeyMyValueReaderWrapper
    extends CombineFileRecordReaderWrapper< MyKey, MyValue > {
        protected CombineMyKeyMyValueReaderWrapper(
            CombineFileSplit split, TaskAttemptContext ctx, Integer idx
        ) throws IOException, InterruptedException {
            super( new MyFormat(), split, ctx, idx );
        }
    }

    @Override
    public RecordReader< MyKey, MyValue > createRecordReader(
        InputSplit split, TaskAttemptContext ctx
    ) throws IOException {
        return new CombineFileRecordReader< MyKey, MyValue >(
            ( CombineFileSplit )split, ctx, CombineMyKeyMyValueReaderWrapper.class
        );
    }
}

ジョブ ドライバーで、 に立ち寄ることができるようになりましCombinedMyFormatMyFormat。また、最大分割サイズ プロパティを設定して、Hadoop が入力全体を 1 つの分割に結合しないようにする必要があります。

于 2015-09-21T22:08:32.977 に答える