1

私は Hadoop と Java に慣れていないので、明らかに足りないものがあると感じています。それが何かを意味する場合、私はHadoop 1.0.3を使用しています。

Hadoop を使用する私の目標は、一連のファイルを取得し、それらを一度に 1 ファイルずつ解析することです (1 行ずつではなく)。各ファイルは複数のキー値を生成しますが、他の行へのコンテキストが重要です。キーと値は複数値/複合であるため、キーには WritableCompare を、値には Writable を実装しました。各ファイルの処理には CPU が少しかかるため、マッパーの出力を保存してから、後で複数のレデューサーを実行したいと考えています。

複合キーについては、[http://stackoverflow.com/questions/12427090/hadoop-composite-key][1] に従いました。

問題は、複合キーと値ではなく、出力が単なる Java オブジェクト参照であることです。例: LinkKeyWritable@bd2f9730 LinkValueWritable@8752408c

問題がデータをまったく削減しないことに関連しているかどうかはわかりません。

これが私のメインクラスです:

public static void main(String[] args) throws Exception {
  JobConf conf = new JobConf(Parser.class);
  conf.setJobName("raw_parser");

  conf.setOutputKeyClass(LinkKeyWritable.class);
  conf.setOutputValueClass(LinkValueWritable.class);

  conf.setMapperClass(RawMap.class);
  conf.setNumMapTasks(0);

  conf.setInputFormat(PerFileInputFormat.class);
  conf.setOutputFormat(TextOutputFormat.class);

  PerFileInputFormat.setInputPaths(conf, new Path(args[0]));
  FileOutputFormat.setOutputPath(conf, new Path(args[1]));

  JobClient.runJob(conf);
}

そして私のMapperクラス:

public class RawMap extends MapReduceBase implement Mapper {

    public void map(NullWritable key, Text value,
            OutputCollector<LinkKeyWritable, LinkValueWritable> output,
            Reporter reporter) throws IOException {
        String json = value.toString();
        SerpyReader reader = new SerpyReader(json);
        GoogleParser parser = new GoogleParser(reader);
        for (String page : reader.getPages()) {
            String content = reader.readPageContent(page);
            parser.addPage(content);
        }
        for (Link link : parser.getLinks()) {
            LinkKeyWritable linkKey = new LinkKeyWritable(link);
            LinkValueWritable linkValue = new LinkValueWritable(link);
            output.collect(linkKey, linkValue);
        }
    }
}

Link は基本的に、LinkKeyWritable と LinkValueWritable の間で分割されるさまざまな情報の構造体です。

LinkKeyWritable:

public class LinkKeyWritable implements WritableComparable<LinkKeyWritable>{
    protected Link link;

    public LinkKeyWritable() {
        super();
        link = new Link();
    }

    public LinkKeyWritable(Link link) {
        super();
        this.link = link;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        link.batchDay = in.readLong();
        link.source = in.readUTF();
        link.domain = in.readUTF();
        link.path = in.readUTF();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(link.batchDay);
        out.writeUTF(link.source);
        out.writeUTF(link.domain);
        out.writeUTF(link.path);
    }

    @Override
    public int compareTo(LinkKeyWritable o) {
        return ComparisonChain.start().
                compare(link.batchDay, o.link.batchDay).
                compare(link.domain, o.link.domain).
                compare(link.path, o.link.path).
                result();
    }

    @Override
    public int hashCode() {
        return Objects.hashCode(link.batchDay, link.source, link.domain, link.path);
    }

    @Override
    public boolean equals(final Object obj){
        if(obj instanceof LinkKeyWritable) {
            final LinkKeyWritable o = (LinkKeyWritable)obj;
            return Objects.equal(link.batchDay, o.link.batchDay)
                    && Objects.equal(link.source, o.link.source)
                    && Objects.equal(link.domain, o.link.domain)
                    && Objects.equal(link.path, o.link.path);
        }
        return false;
    }
}

LinkValueWritable:

public class LinkValueWritable implements Writable{
    protected Link link;

    public LinkValueWritable() {
        link = new Link();
    }

    public LinkValueWritable(Link link) {
        this.link = new Link();
        this.link.type = link.type;
        this.link.description = link.description;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        link.type = in.readUTF();
        link.description = in.readUTF();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(link.type);
        out.writeUTF(link.description);
    }

    @Override
    public int hashCode() {
        return Objects.hashCode(link.type, link.description);
    }

    @Override
    public boolean equals(final Object obj){
        if(obj instanceof LinkKeyWritable) {
            final LinkKeyWritable o = (LinkKeyWritable)obj;
            return Objects.equal(link.type, o.link.type)
                    && Objects.equal(link.description, o.link.description);
        }
        return false;
    }
}
4

2 に答える 2

7

答えはTextOutputFormatの実装にあると思います。具体的には、LineRecordWriter の writeObject メソッド:

/**
 * Write the object to the byte stream, handling Text as a special
 * case.
 * @param o the object to print
 * @throws IOException if the write throws, we pass it on
 */
private void writeObject(Object o) throws IOException {
  if (o instanceof Text) {
    Text to = (Text) o;
    out.write(to.getBytes(), 0, to.getLength());
  } else {
    out.write(o.toString().getBytes(utf8));
  }
}

ご覧のとおり、キーまたは値が Text オブジェクトでない場合、toString メソッドを呼び出してそれを書き出します。キーと値で toString を実装しないままにしておいたので、参照を書き出す Object クラスの実装を使用しています。

適切な toString 関数を作成するか、別の OutputFormat を使用してみてください。

于 2012-10-17T20:27:35.393 に答える
2

希望どおりのオブジェクトのリストがあるようです。見苦しい Java 参照の代わりに、人間が読めるバージョンを出力したい場合は、書き込み可能オブジェクトに toString() を実装する必要があります。

于 2012-10-17T20:20:32.323 に答える