最初のマッパーでフリーズし、識別可能な出力がない MapReduce ツールがあります。これは単一ノードのインストールであるため、ジョブ トラッカーの Web インターフェイスにアクセスしてデバッグすることができませんでした。入力ファイルのサイズに関係なく、この動作が発生します。私は一日中これをハッキングしており、髪を抜く準備ができています. 出力は次のようになります。
13/09/12 15:12:14 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/09/12 15:12:14 WA
RN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/09/12 15:12:14 INFO input.FileInputFormat: Total input paths to process : 1
13/09/12 15:12:14 INFO mapred.JobClient: Running job: job_local1132137425_0001
13/09/12 15:12:14 INFO mapred.LocalJobRunner: Waiting for map tasks
13/09/12 15:12:14 INFO mapred.LocalJobRunner: Starting task: attempt_local1132137425_0001_m_000000_0
13/09/12 15:12:14 INFO util.ProcessTree: setsid exited with exit code 0
13/09/12 15:12:14 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@339c98d3
13/09/12 15:12:14 INFO mapred.MapTask: Processing split: file:/home/axelmagn/EclipseWorkspace/AxelMagnusonCoursework/assign-2/data/in/input.csv:0+33554432
13/09/12 15:12:14 WARN snappy.LoadSnappy: Snappy native library not loaded
13/09/12 15:12:14 INFO mapred.MapTask: io.sort.mb = 100
13/09/12 15:12:14 INFO mapred.MapTask: data buffer = 79691776/99614720
13/09/12 15:12:14 INFO mapred.MapTask: record buffer = 262144/327680
13/09/12 15:12:15 INFO mapred.JobClient: map 0% reduce 0%
13/09/12 15:12:15 INFO mapred.MapTask: Starting flush of map output
13/09/12 15:12:15 INFO mapred.MapTask: Starting flush of map output
13/09/12 15:12:20 INFO mapred.LocalJobRunner:
13/09/12 15:12:21 INFO mapred.JobClient: map 20% reduce 0%
そして、それは無期限にハングします。
ツール ルーチン (要約):
tempPath = new Path("/tmp/" + outDirPath.getName() + "_1_" + now);
tempPath2 = new Path("/tmp/" + outDirPath.getName() + "_2_" + now);
job1 = new VisitorCountJob(inFilePath, tempPath);
success = job1.waitForCompletion(true);
if (!success)
throw new Exception("Visitor Count Job Failed.");
job2 = new TopVisitorJob(tempPath, outDirPath, TOPN);
success = job2.waitForCompletion(true);
return success ? 0 : 1;
仕事:
public class VisitorCountJob extends Job {
public static final String TAB = "\t";
public VisitorCountJob(Path inputPath, Path outputPath)
throws IOException {
super();
this.setJarByClass(VisitorCountJob.class);
this.setJobName("Visitor Count");
this.setInputFormatClass(VisitInputFormat.class);
VisitInputFormat.setInputPaths(this, inputPath);
FileOutputFormat.setOutputPath(this, outputPath);
this.setMapperClass(VisitorCountMapper.class);
this.setReducerClass(VisitorCountReducer.class);
this.setOutputKeyClass(Person.class);
this.setOutputValueClass(IntWritable.class);
this.setOutputFormatClass(SequenceFileOutputFormat.class);
}
}
マッパー:
public class VisitorCountMapper extends
Mapper<LongWritable, Visit, Person, IntWritable> {
@Override
public void map(LongWritable key, Visit value, Context context)
throws IOException, InterruptedException {
try {
Person visitor = value.getVisitor();
context.write(visitor, new IntWritable(1));
} catch (IOException e) {
e.printStackTrace();
throw e;
} catch (InterruptedException e) {
e.printStackTrace();
throw e;
}
}
}
レデューサー:
public class VisitorCountReducer extends
Reducer<Person, IntWritable, Person, IntWritable> {
@Override
public void reduce(Person visitor, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
context.write(visitor, new IntWritable(count));
}
}
生のテキストから Visit オブジェクトを生成する InputFormat と RecordReader も作成しましたが、関連性があると思われる人がいない限り、簡潔にするために省略します。
私は本当に機知に富んでいるので、どんな助けでも大歓迎です。
編集: 関心が示されたため、ここに私のデータ型の実装の一部を示します。
人:
public class Person implements WritableComparable<Person> {
public Text firstName;
public Text lastName;
public Person() {}
public Person(Text firstName, Text lastName) {
this.firstName = firstName;
this.lastName = lastName;
}
public Person(String firstName, String lastName) {
this(new Text(firstName), new Text(lastName));
}
public void readFields(DataInput in) throws IOException {
firstName.readFields(in);
lastName.readFields(in);
}
public void write(DataOutput out) throws IOException {
firstName.write(out);
lastName.write(out);
}
public int compareTo(Person other) {
int out;
// give sorting preference to first name
out = firstName.compareTo(other.firstName);
if(out != 0)
return out;
return lastName.compareTo(other.lastName);
}
}
VisitInputFormat:
public class VisitInputFormat extends FileInputFormat<LongWritable, Visit> {
public RecordReader<LongWritable, Visit> createRecordReader(
InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
VisitRecordReader reader = new VisitRecordReader();
reader.initialize(split, context);
return reader;
}
}
VisitRecordReader:
public class VisitRecordReader extends RecordReader<LongWritable, Visit> {
private LineRecordReader lineReader;
private LongWritable lineKey;
private Text lineValue;
public VisitRecordReader() {
lineReader = new LineRecordReader();
}
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws IOException {
lineReader.initialize(genericSplit, context);
}
public boolean nextKeyValue() throws IOException {
return lineReader.nextKeyValue();
}
public LongWritable getCurrentKey() {
return lineReader.getCurrentKey();
}
public Visit getCurrentValue() {
String raw = lineReader.getCurrentValue().toString();
return new Visit(raw);
}
public float getProgress() throws IOException {
return lineReader.getProgress();
}
public void close() throws IOException {
lineReader.close();
}
}
訪問:
public class VisitRecordReader extends RecordReader<LongWritable, Visit> {
private LineRecordReader lineReader;
private LongWritable lineKey;
private Text lineValue;
public VisitRecordReader() {
lineReader = new LineRecordReader();
}
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws IOException {
lineReader.initialize(genericSplit, context);
}
public boolean nextKeyValue() throws IOException {
return lineReader.nextKeyValue();
}
public LongWritable getCurrentKey() {
return lineReader.getCurrentKey();
}
public Visit getCurrentValue() {
String raw = lineReader.getCurrentValue().toString();
return new Visit(raw);
}
public float getProgress() throws IOException {
return lineReader.getProgress();
}
public void close() throws IOException {
lineReader.close();
}
}