0

イベントが発生した時間に基づいて、Mapper クラスからのイベントにシーケンス番号を割り当てたいと考えています。

たとえば、時間を含む 100 のイベントがあります。時間に基づいてそれらをソートし、リデューサーフェーズでそれらにシーケンス番号を割り当てたいと思います。また、レデューサー フェーズで重複しているレコードを削除します (同じイベントが同時に発生している場合)。

マッパーの方法:

public class EventMapper extends Mapper<LongWritable, Text, Text, Event> {

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String line = value.toString();
    Text newKey;
    Event e = new Event();
    e.setAllValues(line);
    newKey = new Text(e.getKey());
    context.write(newKey, e);
}
}

リデューサーメソッド(私が望むもの):

public class EventReducer extends Reducer<Text, Event, Text, Text> {

public void reduce(Text key, Iterator<Event> itrtr, Context context) throws IOException, InterruptedException {
    Event e;
    List<Event> l = new ArrayList<Event>();
    while(itrtr.hasNext()){
        e = itrtr.next();
         l.add(e);
    }
    Collections.sort(l);
    long i = 1;
    for (Event event : l) {
        event.setId(++i);
        context.write(key, new Text(event.toString()));
    }
}
}

すべての ID を 0 として取得します。どうすればこれを達成できますか? 私は間違ったアプローチに従っていますか?

イベントクラスは次のとおりです。

public class Event implements Writable, WritableComparable<Event> {
//Some variables and getter + setters
 @Override
public String toString() {
    String delimiter1 = "|";
    return this.date + delimiter1
            + this.evName + delimiter1
            + this.evType + delimiter1
            + this.evValue + delimiter1
            + this.name + delimiter1
            + this.id;
}

@Override
public void readFields(DataInput in) throws IOException {
    try {
        this.date = converStringToDate((WritableUtils.readString(in)).toString(), dateFormat);
    } catch (ParseException ex) {
        System.out.println("Wront date . Pe");
    }
    this.evName = WritableUtils.readString(in);
    this.evType = WritableUtils.readString(in);
    this.evValue = WritableUtils.readString(in);
    this.name = WritableUtils.readString(in);
    this.id = WritableUtils.readVLong(in);
}

@Override
public void write(DataOutput out) throws IOException {
    // TODO Auto-generated method stub
    WritableUtils.writeString(out, this.convertDateToString(date));
    WritableUtils.writeString(out, evName);
    WritableUtils.writeString(out, evType);
    WritableUtils.writeString(out, evValue);
    WritableUtils.writeString(out, name);
    WritableUtils.writeVLong(out, id);
}

public int compareTo(Event o) {
    long value = this.getDate().getTime() - o.getDate().getTime();
    if (value == 0) {
        return 0;
    } else if (value > 1) {
        return -1;
    } else {
        return 1;
    }
    }
public void setAllValues(String input) {
    String[] arrValues = input.split(delimiter);
    System.out.println("No of Values = " + arrValues.length);
    try {
        this.date = converStringToDate(arrValues[0], dateFormat);
    } catch (ParseException pe) {
        System.out.println("pe> Error in date");
    }
    if (arrValues.length >= 2) {
        this.evName = arrValues[1];
    }
    if (arrValues.length >= 3) {
        this.evType = arrValues[2];
    }
    if (arrValues.length >= 4) {
        this.evValue = arrValues[3];
    }
    if (arrValues.length >= 5) {
        this.name = arrValues[4];
    }
}

public String getKey() {
    //return convertDateToString(this.date) + this.evName + this.evType;
    return this.evName;
}
}
4

1 に答える 1

0

いくつかの提案:

  • date.getTime() を返すように getKey() を変更します。これは長い値であり、文字列よりも比較が高速です。内部キーのタイプを LongWritable に変更します。
  • レデューサーに渡される前にキー値でレコードをソートする Hadoop の動作を悪用しています。これは並べ替えの 1 つの方法ですが、ジョブ構成で numberOfReducers を 1 に設定する必要があります。そうしないと、複数のレデューサーがそれぞれのパーティションに 1 から始まるランクを割り当てることになります。
  • 複数のレデューサーを使用することもできますが、このジョブの後に、内部でランク付けされたすべてのデータ パーティションをマージするジョブを実行する必要があります。
  • レデューサーは、そのキーを持つ複数のレコードがある場合でも (同時に複数のイベントなど)、各キー値に対して 1 回呼び出されることに注意してください。これらの重複イベントを無視したい場合は、値に含まれるレコードの数に関係なく、リデューサーはコンテキストに 1 つのレコードのみを書き込む必要がありIterableます。
  • ランク (id) を正しく割り当てるには、レデューサーに long 型のインスタンス変数が必要です (それを と呼びますcounter)。メソッドで初期化し、setup()メソッドでインクリメントする必要がありますreduce()
于 2012-11-28T14:43:51.427 に答える