7

シーケンス ファイルを読み取るカスタム InputFormat を作成できるようにしたいと考えていますが、さらにレコードが配置されているファイル内のファイル パスとオフセットを公開します。

一歩戻って、使用例を次に示します。可変サイズのデータ​​を含むシーケンス ファイルがあります。キーはほとんど無関係であり、値はさまざまな異なるフィールドを含む最大数メガバイトです。ファイル名とオフセットとともに、elasticsearch でこれらのフィールドのいくつかにインデックスを付けたいと思います。このようにして、elasticsearch からこれらのフィールドをクエリし、ファイル名とオフセットを使用してシーケンス ファイルに戻り、すべてを ES に格納する代わりに元のレコードを取得できます。

私はこのプロセス全体を単一の Java プログラムとして動作させています。SequenceFile.Reader クラスは、これを実現する便利なメソッドを提供getPositionします。seek

ただし、最終的には何テラバイトものデータが必要になるため、これを MapReduce ジョブ (おそらく Map のみ) に変換する必要があります。シーケンス ファイル内の実際のキーは無関係であるため、私が望んでいたアプローチは、SquenceFileInputFormat を拡張するか何らかの形で利用するカスタム InputFormat を作成することですが、実際のキーを返す代わりに、ファイルで構成される複合キーを返します。そしてオフセット。

しかし、それは実際にはより困難であることが証明されています。可能であるように思われますが、実際の API と公開されているものを考えると、難しいことです。何か案は?多分私が取るべき別のアプローチですか?

4

1 に答える 1

6

誰かが同様の問題に遭遇した場合に備えて、私が思いついた解決策を次に示します。SequenceFileInputFormat/RecordReader のコードの一部を複製し、それを変更するだけで終わりました。サブクラスかデコレータか何かを書きたいと思っていました...この方法はきれいではありませんが、うまくいきます:

SequenceFileOffsetInputFormat.java:

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;

public class SequenceFileOffsetInputFormat<V extends Writable> extends FileInputFormat<PathOffsetWritable, V> {

    private static class SequenceFileOffsetRecordReader<V extends Writable> extends RecordReader<PathOffsetWritable, V> {

        private SequenceFile.Reader in;
        private long start;
        private long end;
        private boolean more = true;
        private PathOffsetWritable key = null;
        private Writable k = null;
        private V value = null;
        private Configuration conf;

        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            FileSplit fileSplit = (FileSplit) split;
            conf = context.getConfiguration();
            Path path = fileSplit.getPath();
            FileSystem fs = path.getFileSystem(conf);
            this.in = new SequenceFile.Reader(fs, path, conf);
            try {
                this.k = (Writable) in.getKeyClass().newInstance();
                this.value = (V) in.getValueClass().newInstance();
            } catch (InstantiationException e) {
                throw new IOException(e);
            } catch (IllegalAccessException e) {
                throw new IOException(e);
            }
            this.end = fileSplit.getStart() + fileSplit.getLength();

            if (fileSplit.getStart() > in.getPosition()) {
                in.sync(fileSplit.getStart());
            }

            this.start = in.getPosition();
            more = start < end;

            key = new PathOffsetWritable(path, start);
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            if (!more) {
                return false;
            }
            long pos = in.getPosition();

            more = in.next(k, value);
            if (!more || (pos >= end && in.syncSeen())) {
                key = null;
                value = null;
                more = false;
            } else {
                key.setOffset(pos);
            }
            return more;
        }

        @Override
        public PathOffsetWritable getCurrentKey() {
            return key;
        }

        @Override
        public V getCurrentValue() {
            return value;
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            if (end == start) {
                return 0.0f;
            } else {
                return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start));
            }
        }

        @Override
        public void close() throws IOException {
            in.close();
        }

    }

    @Override
    public RecordReader<PathOffsetWritable, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        return new SequenceFileOffsetRecordReader<V>();
    }

    @Override
    public List<InputSplit> getSplits(JobContext context) throws IOException {
        return new SequenceFileInputFormat<PathOffsetWritable, V>().getSplits(context);
    }

    @Override
    public long getFormatMinSplitSize() {
        return SequenceFile.SYNC_INTERVAL;
    }


}

PathOffsetWritable.java:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class PathOffsetWritable implements WritableComparable<PathOffsetWritable> {

    private Text t = new Text();
    private Path path;
    private long offset;

    public PathOffsetWritable(Path path, long offset) {
        this.path = path;
        this.offset = offset;
    }

    public Path getPath() {
        return path;
    }

    public long getOffset() {
        return offset;
    }

    public void setPath(Path path) {
        this.path = path;
    }

    public void setOffset(long offset) {
        this.offset = offset;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        t.readFields(in);
        path = new Path(t.toString());
        offset = in.readLong();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        t.set(path.toString());
        t.write(out);
        out.writeLong(offset);
    }

    @Override
    public int compareTo(PathOffsetWritable o) {
        int x = path.compareTo(o.path);
        if (x != 0) {
            return x;
        } else {
            return Long.valueOf(offset).compareTo(Long.valueOf(o.offset));
        }
    }


}
于 2013-09-06T00:22:20.463 に答える