私のプログラムはすべて Hadoop の新しい MR1 インターフェイス (org.apache.hadoop.mapreduce) で作成しているため、avro の新しい org.apache.avro.mapreduce も使用したいと考えています。しかし、それは私にはうまくいきません。
プログラムは avro データの入力を受け取り、同じものを出力します。私のプログラムの背後にある主なアイデアは、avro ラップされたキー/値に対して Hadoop の Mapper と Reducer をサブクラス化することです。これが私のジョブドライバーのブロックです:
AvroJob.setInputKeySchema(job, NetflowRecord.getClassSchema());
AvroJob.setOutputKeySchema(job, NetflowRecord.getClassSchema());
job.setMapperClass(MyAvroMap.class);
job.setReducerClass(MyAvroReduce.class);
job.setInputFormatClass(AvroKeyInputFormat.class);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
job.setMapOutputKeyClass(AvroKey.class);
job.setMapOutputValueClass(AvroValue.class);
job.setOutputKeyClass(AvroKey.class);
job.setOutputValueClass(NullWritable.class);
MyAvroMap および MyAvroReduce サブクラスの定義はそれぞれ次のとおりです。
public static class MyAvroMap extends Mapper<AvroKey<NetflowRecord>, NullWritable,
AvroKey<CharSequence>, AvroValue<NetflowRecord>>{ ... }
public static class MyAvroReduce extends Reducer<AvroKey<CharSequence>, AvroValue<NetflowRecord>,
AvroKey<NetflowRecord>, NullWritable>{ ... }
メソッド化された NetflowRecord は、私の avro レコード クラスです。そして、実行中の例外が発生しました
java.lang.ClassCastException: class org.apache.avro.hadoop.io.AvroKey
Hadoop と avro のソース コードを読むと、このようにマップ キーが WritableComparable のサブクラスであることを確認するために、JobConf によって例外がスローされることがわかりました (hadoop1.2.1, line759)。
WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));
しかし、avro は、AvroKey と AvroValue が Hadoopの Writable* インターフェイスをサブクラス化していない単純なラッパーであることを示しています。
テストしなくても、古い mapred インターフェースを使用できると思いますが、それは私が望んでいるものではありません。純粋な org.apache.avro.mapreduce インターフェイスを使用したプログラミングの例や説明を教えてもらえますか??
心から、
ジャミン