コード
Action Book で Hadoop の DataJoin の例を実行してみました。
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
// import org.apache.commons.logging.Log;
// import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.contrib.utils.join.*;
public class MultiDataSetJoinMR extends Configured implements Tool
{
public static class MapClass extends DataJoinMapperBase
{
protected Text generateInputTag(String inputFile)
{
String datasource = inputFile.split("-")[0];
return new Text(datasource);
}
protected Text generateGroupKey(TaggedMapOutput aRecord)
{
String line = ((Text) aRecord.getData()).toString();
String[] tokens = line.split(",");
String groupKey = tokens[0];
return new Text(groupKey);
}
protected TaggedMapOutput generateTaggedMapOutput(Object value)
{
TaggedWritable retv = new TaggedWritable((Text) value);
retv.setTag(this.inputTag);
return retv;
}
}
public static class Reduce extends DataJoinReducerBase
{
protected TaggedMapOutput combine(Object[] tags, Object[] values)
{
if (tags.length < 2) return null;
String joinedStr = "";
for (int i=0; i<values.length; i++)
{
if (i > 0) joinedStr += ",";
TaggedWritable tw = (TaggedWritable) values[i];
String line = ((Text) tw.getData()).toString();
String[] tokens = line.split(",", 2);
joinedStr += tokens[1];
}
TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
retv.setTag((Text) tags[0]);
return retv;
}
}
public static class TaggedWritable extends TaggedMapOutput
{
private Writable data;
public TaggedWritable() {
this.tag = new Text();
}
public TaggedWritable(Writable data)
{
this.tag = new Text("");
this.data = data;
}
public Writable getData()
{
return data;
}
public void write(DataOutput out) throws IOException
{
this.tag.write(out);
this.data.write(out);
}
public void readFields(DataInput in) throws IOException
{
this.tag.readFields(in);
this.data.readFields(in);
}
}
public int run(String[] args) throws Exception
{
Configuration conf = getConf();
JobConf job = new JobConf(conf, MultiDataSetJoinMR.class);
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2)
{
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("DataJoin");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TaggedWritable.class);
job.set("mapred.textoutputformat.separator", ",");
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception
{
int res = ToolRunner.run(new Configuration(),
new MultiDataSetJoinMR(),
args);
System.exit(res);
}
}
実行中のコマンド
./hadoop jar MultiDataSetJoin.jar /home/project/dataset /home/project/out
エラー
しかし、私は次の問題に直面しています。
15 Mar, 2013 4:29:45 PM org.apache.hadoop.metrics.jvm.JvmMetrics init
INFO: Initializing JVM Metrics with processName=JobTracker, sessionId=
15 Mar, 2013 4:29:45 PM org.apache.hadoop.mapred.JobClient configureCommandLineOptions
WARNING: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
15 Mar, 2013 4:29:45 PM org.apache.hadoop.mapred.FileInputFormat listStatus
INFO: Total input paths to process : 2
15 Mar, 2013 4:29:45 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO: Running job: job_local_0001
15 Mar, 2013 4:29:45 PM org.apache.hadoop.mapred.FileInputFormat listStatus
INFO: Total input paths to process : 2
15 Mar, 2013 4:29:45 PM org.apache.hadoop.mapred.MapTask runOldMapper
INFO: numReduceTasks: 1
15 Mar, 2013 4:29:45 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: io.sort.mb = 100
15 Mar, 2013 4:29:45 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: data buffer = 79691776/99614720
15 Mar, 2013 4:29:45 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: record buffer = 262144/327680
15 Mar, 2013 4:29:45 PM org.apache.hadoop.mapred.LocalJobRunner$Job run
WARNING: job_local_0001
java.lang.RuntimeException: Error in configuring object
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:93)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:64)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:117)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:354)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:177)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:616)
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:88)
... 5 more
Caused by: java.lang.RuntimeException: Error in configuring object
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:93)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:64)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:117)
at org.apache.hadoop.mapred.MapRunner.configure(MapRunner.java:34)
... 10 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:616)
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:88)
... 13 more
Caused by: java.lang.NullPointerException
at MultiDataSetJoinMR$MapClass.generateInputTag(MultiDataSetJoinMR.java:31)
at org.apache.hadoop.contrib.utils.join.DataJoinMapperBase.configure(DataJoinMapperBase.java:60)
... 18 more
null15 Mar, 2013 4:29:46 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO: map 0% reduce 0%
15 Mar, 2013 4:29:46 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO: Job complete: job_local_0001
15 Mar, 2013 4:29:46 PM org.apache.hadoop.mapred.Counters log
INFO: Counters: 0
Exception in thread "main" java.io.IOException: Job failed!
at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1252)
at MultiDataSetJoinMR.run(MultiDataSetJoinMR.java:123)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at MultiDataSetJoinMR.main(MultiDataSetJoinMR.java:128)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:616)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
ログ トレースから、inputFile 変数が以下のメソッドで null 値を取得することを識別できます。
protected Text generateInputTag(String inputFile)
{
String datasource = inputFile.split("-")[0];
return new Text(datasource);
}
どこから呼び出され、どのように修正するのかわかりません。誰でも私を助けてくれますか