オブジェクト (クリック) がデータセットに表示される頻度をカウントする Hadoop ジョブを実装しようとしています。したがって、カスタムファイル入力形式を作成しました。レコード リーダーは、指定されたファイルの最初の行のみを読み取り、入力ストリームを閉じるようです。
コードは次のとおりです。
Pojo クラス:
package model;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class Click implements WritableComparable<Click> {
private String user;
private String clickStart;
private String date;
private String clickTarget;
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(user);
out.writeUTF(clickStart);
out.writeUTF(date);
out.writeUTF(clickTarget);
}
@Override
public void readFields(DataInput in) throws IOException {
user = in.readUTF();
clickStart = in.readUTF();
date = in.readUTF();
clickTarget = in.readUTF();
}
public int compareTo(Click arg0) {
int response = clickTarget.compareTo(arg0.clickTarget);
if (response == 0) {
response = date.compareTo(arg0.date);
}
return response;
}
public String getUser(String user) {
return this.user;
}
public void setUser(String user) {
this.user = user;
}
public String getClickStart() {
return clickStart;
}
public void setClickStart(String clickStart) {
this.clickStart = clickStart;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public String getClickTarget() {
return clickTarget;
}
public void setClickTarget(String clickTarget) {
this.clickTarget = clickTarget;
}
public String toString() {
return clickStart + "\t" + date;
}
}
FileInputFormat クラスは次のとおりです。
package ClickAnalysis;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import model.Click;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.tools.ant.types.CommandlineJava.SysProperties;
public class ClickAnalysisInputFormat extends FileInputFormat<Click, IntWritable>{
@Override
public RecordReader<Click, IntWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
System.out.println("Creating Record Reader");
return new ClickReader();
}
public static class ClickReader extends RecordReader<Click, IntWritable> {
private BufferedReader in;
private Click key;
private IntWritable value;
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
key = new Click();
value = new IntWritable(1);
System.out.println("Starting to read ...");
FileSplit split = (FileSplit) inputSplit;
Configuration conf = context.getConfiguration();
Path path = split.getPath();
InputStream is = path.getFileSystem(conf).open(path);
in = new BufferedReader(new InputStreamReader(is));
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
String line = in.readLine();
System.out.println("line: " + line);
boolean hasNextKeyValue;
if (line == null) {
System.out.println("line is null");
hasNextKeyValue = false;
} else {
String[] click = StringUtils.split(line, '\\', ';');
System.out.println(click[0].toString());
System.out.println(click[1].toString());
System.out.println(click[2].toString());
key.setClickStart(click[0].toString());
key.setDate(click[1].toString());
key.setClickTarget(click[2].toString());
value.set(1);
System.out.println("done with first line");
hasNextKeyValue = true;
}
System.out.println(hasNextKeyValue);
return hasNextKeyValue;
}
@Override
public Click getCurrentKey() throws IOException, InterruptedException {
return this.key;
}
@Override
public IntWritable getCurrentValue() throws IOException, InterruptedException {
return this.value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
public void close() throws IOException {
in.close();
System.out.println("in closed");
}
}
}
マッパークラス:
package ClickAnalysis;
import java.io.IOException;
import model.Click;
import model.ClickStartTarget;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.jruby.RubyProcess.Sys;
public class ClickAnalysisMapper extends Mapper<Click, IntWritable, Click, IntWritable> {
private static final IntWritable outputValue = new IntWritable();
@Override
protected void map(Click key, IntWritable value, Context context) throws IOException, InterruptedException {
System.out.println("Key: " + key.getClickStart() + " " + key.getDate() + " " + key.getClickTarget() + " Value: " + value);
outputValue.set(value.get());
System.out.println(outputValue.get());
context.write(key, outputValue);
System.out.println("nach context");
}
}
パーティショナー クラス:
package ClickAnalysis;
import model.Click;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class ClickAnalysisPartitioner extends Partitioner<Click, IntWritable> {
@Override
public int getPartition(Click key, IntWritable value, int numPartitions) {
System.out.println("in Partitioner drinnen");
int partition = numPartitions;
return partition;
}
}
サーブレット コンテナー内の Restful Web サービス呼び出しを介してトリガーされる Hadoop ジョブですが、これは問題にはなりません。
package ClickAnalysis;
import model.Click;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
public class ClickAnalysisJob {
public int run() throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "ClickAnalysisJob");
job.setJarByClass(ClickAnalysisJob.class);
// Job Input path
FileInputFormat.setInputPaths(job, "hdfs://localhost:9000/user/hadoop/testdata1.csv");
// Job Output path
Path out = new Path("hdfs://localhost:9000/user/hadoop/clickdataAnalysis_out");
FileOutputFormat.setOutputPath(job, out);
out.getFileSystem(conf).delete(out,true);
job.setMapperClass(ClickAnalysisMapper.class);
job.setReducerClass(Reducer.class);
job.setPartitionerClass(ClickAnalysisPartitioner.class);
//job.setReducerClass(ClickAnalysisReducer.class);
job.setInputFormatClass(ClickAnalysisInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Click.class);
job.setOutputValueClass(IntWritable.class);
job.setMapOutputKeyClass(Click.class);
job.setMapOutputValueClass(IntWritable.class);
System.out.println("in run drinnen");
//job.setGroupingComparatorClass(ClickTargetAnalysisComparator.class);
job.setNumReduceTasks(1);
int result = job.waitForCompletion(true)? 0:1;
return result;
}
}
次のデータセット (例):
/web/big-data-test-site/test-seite-1;2014-07-08;ein ziel
/web/big-data-test-site/test-seite-1;2014-07-08;ein anderes ziel
/web/big-data-test-site/test-seite-1;2014-07-08;ein anderes ziel
/web/big-data-test-site/test-seite-1;2014-07-08;ein ziel
/web/big-data-test-site/test-seite-1;2014-07-08;ein drittes ziel
/web/big-data-test-site/test-seite-1;2014-07-08;ein ziel
/web/big-data-test-site/test-seite-1;2014-07-08;ein viertes ziel
/web/big-data-test-site/test-seite-1;2014-07-08;ein ziel
プログラムを実行すると、syso は次のように表示されます。
in run drinnen
Creating Record Reader
Starting to read ...
line: /web/big-data-test-site/test-seite-1;2014-07-08;ein ziel
/web/big-data-test-site/test-seite-1
2014-07-08
ein ziel
done with first line
true
Key: /web/big-data-test-site/test-seite-1 2014-07-08 ein ziel Value: 1
1
in closed
analyze Method: 1
そのことから、レコードリーダーは最初の行のみを読み取ると結論付けています。なぜこの問題が発生し、どのように修正されますか?