Avro 1.7.4 ファイルを受け取る Mapper (.mapreduce を使用) API を作成しようとしています。特定の API (maven avro プラグイン) を使用して、.avdl ファイルからオブジェクトを生成しています。
Avro ファイルが正しく生成され、Avro ツール jar の avro-to-json 関数を使用してファイルを読み取ることができることを確認しました。
私が受け取っているエラーは次のとおりです。
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to
org.lab41.cyprus.domain.NetworkRecord
at org.lab41.cyprus.mapreduce.RollupAvroFilesMapper.map(RollupAvroFilesMapper.java:45)
at org.lab41.cyprus.mapreduce.RollupAvroFilesMapper.map(RollupAvroFilesMapper.java:23)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:645)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:263)\
ドライバークラスは次のとおりです。
public class RollupAvroFiles extends Configured implements Tool{
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
String input, output;
if (otherArgs.length == 2) {
input = otherArgs[0];
output = otherArgs[1];
} else {
return 1;
}
/** configure Job **/
Job job = new Job(conf, "RollupAvroFiles");
job.setJarByClass(RollupAvroFiles.class);
job.setUserClassesTakesPrecedence(true);
job.setNumReduceTasks(1);
FileInputFormat.setInputPaths(job, new Path(input));
job.setInputFormatClass(AvroKeyInputFormat.class);
AvroJob.setInputKeySchema(job, NetworkRecord.SCHEMA$);
job.setMapperClass(RollupAvroFilesMapper.class);
AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.LONG));
AvroJob.setMapOutputValueSchema(job, NetworkRecord.SCHEMA$);
job.setMapOutputKeyClass(AvroKey.class);
job.setMapOutputValueClass(AvroValue.class);
job.setReducerClass(RollupAvroFilesReducer.class);
AvroJob.setOutputKeySchema(job, NetworkRecord.SCHEMA$);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(output));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new RollupAvroFiles(), args);
System.exit(exitCode);
}
}
そしてMapperクラスは次のように読みます:
public class RollupAvroFilesMapper
extends Mapper<AvroKey<NetworkRecord>, NullWritable, AvroKey<Long>,
AvroValue<NetworkRecord>> {
@Override
protected void setup(Context context) throws IOException, InterruptedException {
}
@Override
protected void map(AvroKey<NetworkRecord> key, NullWritable value, Context context)
throws IOException, InterruptedException {
NetworkRecord = key.datum();
….
}
}
どんな考えでもいただければ幸いです