2つの整数の組み合わせであるオブジェクト「IntPair」をソートするために使用されるhadoopで演習を行いました。入力ファイルは次のとおりです。
2,9
3,8
2,6
3,2
...
クラス「IntPair」は次のようになります。
static class IntPair implements WritableComparable<IntPair> {
private int first;
private int second;
...
public int compareTo(IntPair o) {
return (this.first==o.first)?(this.second==o.second?0:(this.second>o.second?1:-1)):(this.first>o.first?1:-1);
}
public static int compare(int a, int b) {
return (a==b)?0:((a>b)?1:-1);
}
...
}
Mapper では、inputFormat と outputKey/Value を使用し、1 行に 2 つの整数を含む IntPair インスタンスを作成するだけです。
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String v[] = value.toString().split(",");
IntPair k = new IntPair(Integer.parseInt(v[0]), Integer.parseInt(v[1]));
context.write(k, NullWritable.get());
}
最初の整数に基づいてマッパーの結果を分割し、最初の整数に基づいてグループ コンパレータも作成します。ソート コンパレータのみが両方の整数に基づいています。
static class FirstPartitioner extends Partitioner<IntPair, NullWritable> {
public int getPartition(IntPair key, NullWritable value, int numPartitions) {
return Math.abs(key.getFirst()*127)%numPartitions;
}
}
static class BothComparator extends WritableComparator {
public int compare(WritableComparable w1, WritableComparable w2) {
IntPair p1 = (IntPair)w1;
IntPair p2 = (IntPair)w2;
int cmp = IntPair.compare(p1.getFirst(), p2.getFirst());
if(cmp != 0) {
return cmp;
}
return -IntPair.compare(p1.getSecond(), p2.getSecond());//reverse sort
}
}
static class FirstGroupComparator extends WritableComparator {
public int compare(WritableComparable w1, WritableComparable w2) {
IntPair p1 = (IntPair)w1;
IntPair p2 = (IntPair)w2;
return IntPair.compare(p1.getFirst(), p2.getFirst());
}
}
そして、Reducer では、IntPair をキーとして出力し、NullWritable を値として出力します。
static class SSReducer extends Reducer<IntPair, NullWritable, IntPair, NullWritable> {
protected void reduce(IntPair key, Iterable<NullWritable> values,
Context context)throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
Hadoop を実行した後、次の結果が得られました。
2,9
3,8
以前、レデューサーはレコードをキー (IntPair) でグループ化する必要があると考えていました。各レコードは異なるキーを表すため、各レコードはメソッド「reduce」を 1 回呼び出します。その場合、結果は次のようになります。
2,9
2,6
3,8
3,2
したがって、グループコンパレーターは比較に最初の整数のみを使用するため、違いがあると思いました。したがって、レデューサーでは、レコードは最初の整数でグループ化されます。この例では、2 つのレコードのそれぞれが「reduce」を 1 回呼び出すことを意味するため、ループせずにグループごとに最初のレコードのみが生成されます。そうですか?また、レデューサーを次のように変更する別の実験を行いました。
static class SSReducer extends Reducer<IntPair, NullWritable, IntPair, NullWritable> {
protected void reduce(IntPair key, Iterable<NullWritable> values,
Context context)throws IOException, InterruptedException {
for(NullWritable n : values) //add looping
context.write(key, NullWritable.get());
}
}
次に、4 つの項目がある結果を生成します。
また、両方の整数を使用して比較するように groupcomparator を変更すると、4 つの項目も生成されます。そのため、reducer は実際には groupcomparator を使用してキーをグループ化します。つまり、キーが異なっていても、1 つのグループ内の各レコードが 'reduce' を 1 回呼び出します。