Hadoop がどのように動作するかを学ぶために、独自の WritableComparable クラスを作成することにしました。そこで、2 つのインスタンス変数 (orderNumber cliente) を持つ Order クラスを作成し、必要なメソッドを実装しました。getter/setter/hashCode/equals/toString にも Eclipse ジェネレーターを使用しました。
compareTo では、orderNumber 変数のみを使用することにしました。
データセット内の注文の発生をカウントするためだけに、単純な MapReduce ジョブを作成しました。ここでわかるように、誤って私のテスト記録の 1 つが Itá ではなく Ita になっています。
123 Ita
123 Itá
123 Itá
345 Carol
345 Carol
345 Carol
345 Carol
456 Iza Smith
レコード 1 の hashCode はレコード 2 および 3 の hashCode とは異なるため、最初のレコードは別の順序で処理する必要があることを理解しています。
しかし、reduce フェーズでは、3 つのレコードがグループ化されます。ここでわかるように:
Order [cliente=Ita, orderNumber=123] 3
Order [cliente=Carol, orderNumber=345] 4
Order [cliente=Iza Smith, orderNumber=456] 1
カウント 2 の Itá レコードの行が必要で、Ita にはカウント 1 が必要だと思いました。
compareTo で orderNumber のみを使用したので、このメソッドで String cliente を使用しようとしました (以下のコードでコメント)。そして、それは私が期待していたように機能しました。
それで、それは期待される結果ですか?hadoop は hashCode のみを使用してキーとその値をグループ化するべきではありませんか?
Order クラスは次のとおりです (ゲッター/セッターは省略しました)。
public class Order implements WritableComparable<Order>
{
private String cliente;
private long orderNumber;
@Override
public void readFields(DataInput in) throws IOException
{
cliente = in.readUTF();
orderNumber = in.readLong();
}
@Override
public void write(DataOutput out) throws IOException
{
out.writeUTF(cliente);
out.writeLong(orderNumber);
}
@Override
public int compareTo(Order o) {
long thisValue = this.orderNumber;
long thatValue = o.orderNumber;
return (thisValue < thatValue ? -1 :(thisValue == thatValue ? 0 :1));
//return this.cliente.compareTo(o.cliente);
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((cliente == null) ? 0 : cliente.hashCode());
result = prime * result + (int) (orderNumber ^ (orderNumber >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Order other = (Order) obj;
if (cliente == null) {
if (other.cliente != null)
return false;
} else if (!cliente.equals(other.cliente))
return false;
if (orderNumber != other.orderNumber)
return false;
return true;
}
@Override
public String toString() {
return "Order [cliente=" + cliente + ", orderNumber=" + orderNumber + "]";
}
MapReduce コードは次のとおりです。
public class TesteCustomClass extends Configured implements Tool
{
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Order, LongWritable>
{
LongWritable outputValue = new LongWritable();
String[] campos;
Order order = new Order();
@Override
public void configure(JobConf job)
{
}
@Override
public void map(LongWritable key, Text value, OutputCollector<Order, LongWritable> output, Reporter reporter) throws IOException
{
campos = value.toString().split("\t");
order.setOrderNumber(Long.parseLong(campos[0]));
order.setCliente(campos[1]);
outputValue.set(1L);
output.collect(order, outputValue);
}
}
public static class Reduce extends MapReduceBase implements Reducer<Order, LongWritable, Order,LongWritable>
{
@Override
public void reduce(Order key, Iterator<LongWritable> values,OutputCollector<Order,LongWritable> output, Reporter reporter) throws IOException
{
LongWritable value = new LongWritable(0);
while (values.hasNext())
{
value.set(value.get() + values.next().get());
}
output.collect(key, value);
}
}
@Override
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(),TesteCustomClass.class);
conf.setMapperClass(Map.class);
// conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setJobName("Teste - Custom Classes");
conf.setOutputKeyClass(Order.class);
conf.setOutputValueClass(LongWritable.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(),new TesteCustomClass(),args);
System.exit(res);
}
}