イベントが発生した時間に基づいて、Mapper クラスからのイベントにシーケンス番号を割り当てたいと考えています。
たとえば、時間を含む 100 のイベントがあります。時間に基づいてそれらをソートし、リデューサーフェーズでそれらにシーケンス番号を割り当てたいと思います。また、レデューサー フェーズで重複しているレコードを削除します (同じイベントが同時に発生している場合)。
マッパーの方法:
public class EventMapper extends Mapper<LongWritable, Text, Text, Event> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
Text newKey;
Event e = new Event();
e.setAllValues(line);
newKey = new Text(e.getKey());
context.write(newKey, e);
}
}
リデューサーメソッド(私が望むもの):
public class EventReducer extends Reducer<Text, Event, Text, Text> {
public void reduce(Text key, Iterator<Event> itrtr, Context context) throws IOException, InterruptedException {
Event e;
List<Event> l = new ArrayList<Event>();
while(itrtr.hasNext()){
e = itrtr.next();
l.add(e);
}
Collections.sort(l);
long i = 1;
for (Event event : l) {
event.setId(++i);
context.write(key, new Text(event.toString()));
}
}
}
すべての ID を 0 として取得します。どうすればこれを達成できますか? 私は間違ったアプローチに従っていますか?
イベントクラスは次のとおりです。
public class Event implements Writable, WritableComparable<Event> {
//Some variables and getter + setters
@Override
public String toString() {
String delimiter1 = "|";
return this.date + delimiter1
+ this.evName + delimiter1
+ this.evType + delimiter1
+ this.evValue + delimiter1
+ this.name + delimiter1
+ this.id;
}
@Override
public void readFields(DataInput in) throws IOException {
try {
this.date = converStringToDate((WritableUtils.readString(in)).toString(), dateFormat);
} catch (ParseException ex) {
System.out.println("Wront date . Pe");
}
this.evName = WritableUtils.readString(in);
this.evType = WritableUtils.readString(in);
this.evValue = WritableUtils.readString(in);
this.name = WritableUtils.readString(in);
this.id = WritableUtils.readVLong(in);
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
WritableUtils.writeString(out, this.convertDateToString(date));
WritableUtils.writeString(out, evName);
WritableUtils.writeString(out, evType);
WritableUtils.writeString(out, evValue);
WritableUtils.writeString(out, name);
WritableUtils.writeVLong(out, id);
}
public int compareTo(Event o) {
long value = this.getDate().getTime() - o.getDate().getTime();
if (value == 0) {
return 0;
} else if (value > 1) {
return -1;
} else {
return 1;
}
}
public void setAllValues(String input) {
String[] arrValues = input.split(delimiter);
System.out.println("No of Values = " + arrValues.length);
try {
this.date = converStringToDate(arrValues[0], dateFormat);
} catch (ParseException pe) {
System.out.println("pe> Error in date");
}
if (arrValues.length >= 2) {
this.evName = arrValues[1];
}
if (arrValues.length >= 3) {
this.evType = arrValues[2];
}
if (arrValues.length >= 4) {
this.evValue = arrValues[3];
}
if (arrValues.length >= 5) {
this.name = arrValues[4];
}
}
public String getKey() {
//return convertDateToString(this.date) + this.evName + this.evType;
return this.evName;
}
}