私は Hadoop と Java に慣れていないので、明らかに足りないものがあると感じています。それが何かを意味する場合、私はHadoop 1.0.3を使用しています。
Hadoop を使用する私の目標は、一連のファイルを取得し、それらを一度に 1 ファイルずつ解析することです (1 行ずつではなく)。各ファイルは複数のキー値を生成しますが、他の行へのコンテキストが重要です。キーと値は複数値/複合であるため、キーには WritableCompare を、値には Writable を実装しました。各ファイルの処理には CPU が少しかかるため、マッパーの出力を保存してから、後で複数のレデューサーを実行したいと考えています。
複合キーについては、[http://stackoverflow.com/questions/12427090/hadoop-composite-key][1] に従いました。
問題は、複合キーと値ではなく、出力が単なる Java オブジェクト参照であることです。例:
LinkKeyWritable@bd2f9730 LinkValueWritable@8752408c
問題がデータをまったく削減しないことに関連しているかどうかはわかりません。
これが私のメインクラスです:
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(Parser.class);
conf.setJobName("raw_parser");
conf.setOutputKeyClass(LinkKeyWritable.class);
conf.setOutputValueClass(LinkValueWritable.class);
conf.setMapperClass(RawMap.class);
conf.setNumMapTasks(0);
conf.setInputFormat(PerFileInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
PerFileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
そして私のMapperクラス:
public class RawMap extends MapReduceBase implement Mapper {
public void map(NullWritable key, Text value,
OutputCollector<LinkKeyWritable, LinkValueWritable> output,
Reporter reporter) throws IOException {
String json = value.toString();
SerpyReader reader = new SerpyReader(json);
GoogleParser parser = new GoogleParser(reader);
for (String page : reader.getPages()) {
String content = reader.readPageContent(page);
parser.addPage(content);
}
for (Link link : parser.getLinks()) {
LinkKeyWritable linkKey = new LinkKeyWritable(link);
LinkValueWritable linkValue = new LinkValueWritable(link);
output.collect(linkKey, linkValue);
}
}
}
Link は基本的に、LinkKeyWritable と LinkValueWritable の間で分割されるさまざまな情報の構造体です。
LinkKeyWritable:
public class LinkKeyWritable implements WritableComparable<LinkKeyWritable>{
protected Link link;
public LinkKeyWritable() {
super();
link = new Link();
}
public LinkKeyWritable(Link link) {
super();
this.link = link;
}
@Override
public void readFields(DataInput in) throws IOException {
link.batchDay = in.readLong();
link.source = in.readUTF();
link.domain = in.readUTF();
link.path = in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(link.batchDay);
out.writeUTF(link.source);
out.writeUTF(link.domain);
out.writeUTF(link.path);
}
@Override
public int compareTo(LinkKeyWritable o) {
return ComparisonChain.start().
compare(link.batchDay, o.link.batchDay).
compare(link.domain, o.link.domain).
compare(link.path, o.link.path).
result();
}
@Override
public int hashCode() {
return Objects.hashCode(link.batchDay, link.source, link.domain, link.path);
}
@Override
public boolean equals(final Object obj){
if(obj instanceof LinkKeyWritable) {
final LinkKeyWritable o = (LinkKeyWritable)obj;
return Objects.equal(link.batchDay, o.link.batchDay)
&& Objects.equal(link.source, o.link.source)
&& Objects.equal(link.domain, o.link.domain)
&& Objects.equal(link.path, o.link.path);
}
return false;
}
}
LinkValueWritable:
public class LinkValueWritable implements Writable{
protected Link link;
public LinkValueWritable() {
link = new Link();
}
public LinkValueWritable(Link link) {
this.link = new Link();
this.link.type = link.type;
this.link.description = link.description;
}
@Override
public void readFields(DataInput in) throws IOException {
link.type = in.readUTF();
link.description = in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(link.type);
out.writeUTF(link.description);
}
@Override
public int hashCode() {
return Objects.hashCode(link.type, link.description);
}
@Override
public boolean equals(final Object obj){
if(obj instanceof LinkKeyWritable) {
final LinkKeyWritable o = (LinkKeyWritable)obj;
return Objects.equal(link.type, o.link.type)
&& Objects.equal(link.description, o.link.description);
}
return false;
}
}