4

私のプログラムはすべて Hadoop の新しい MR1 インターフェイス (org.apache.hadoop.mapreduce) で作成しているため、avro の新しい org.apache.avro.mapreduce も使用したいと考えています。しかし、それは私にはうまくいきません。

プログラムは avro データの入力を受け取り、同じものを出力します。私のプログラムの背後にある主なアイデアは、avro ラップされたキー/値に対して Hadoop の Mapper と Reducer をサブクラス化することです。これが私のジョブドライバーのブロックです:

    AvroJob.setInputKeySchema(job, NetflowRecord.getClassSchema());
    AvroJob.setOutputKeySchema(job, NetflowRecord.getClassSchema());

    job.setMapperClass(MyAvroMap.class);
    job.setReducerClass(MyAvroReduce.class);

    job.setInputFormatClass(AvroKeyInputFormat.class);
    job.setOutputFormatClass(AvroKeyOutputFormat.class);

    job.setMapOutputKeyClass(AvroKey.class);
    job.setMapOutputValueClass(AvroValue.class);

    job.setOutputKeyClass(AvroKey.class);
    job.setOutputValueClass(NullWritable.class);

MyAvroMap および MyAvroReduce サブクラスの定義はそれぞれ次のとおりです。

public static class MyAvroMap extends Mapper<AvroKey<NetflowRecord>, NullWritable,
            AvroKey<CharSequence>, AvroValue<NetflowRecord>>{ ... }

public static class MyAvroReduce extends Reducer<AvroKey<CharSequence>, AvroValue<NetflowRecord>, 
                AvroKey<NetflowRecord>, NullWritable>{ ... }

メソッド化された NetflowRecord は、私の avro レコード クラスです。そして、実行中の例外が発生しました

java.lang.ClassCastException: class org.apache.avro.hadoop.io.AvroKey

Hadoop と avro のソース コードを読むと、このようにマップ キーが WritableComparable のサブクラスであることを確認するために、JobConf によって例外がスローされることがわかりました (hadoop1.2.1, line759)。

WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));

しかし、avro は、AvroKey と AvroValue が Hadoopの Writable* インターフェイスをサブクラス化していない単純なラッパーであることを示しています。

テストしなくても、古い mapred インターフェースを使用できると思いますが、それは私が望んでいるものではありません。純粋な org.apache.avro.mapreduce インターフェイスを使用したプログラミングの例や説明を教えてもらえますか??

心から、

ジャミン

4

1 に答える 1

5

このパッチhttps://issues.apache.org/jira/browse/AVRO-593の助けを借りて、懸命に検索した後、各 AvroKey および AvroValue ラッパーには、ジョブ構成にスキーマ定義が必要であることがわかりました。それが私が逃したものです。

ここで 2 つのオプションがあります。

  1. MyAvroMap と MyAvroReduce を変更しない場合は、CharSequence のスキーマを定義し、マッパー出力用の AvroJob でこのスキーマを宣言する必要があります。

    AvroJob.setMapOutputKeySchema(ジョブ、<"文字シーケンスの定義済みスキーマ">); AvroJob.setMapOutputValueSchema(ジョブ、NetflowRecord.getClassSchema());

  2. Mapper の出力キー/値を Text/AvroValue に変更することで、次のように Mapper の出力値のスキーマ宣言を追加するだけで済みます。

    job.setMapOutputKeyClass(Text.class); AvroJob.setMapOutputValueSchema(ジョブ、NetflowRecord.getClassSchema());

mapreduce API を使用すると、AvroMapper と AvroReducer をサブクラス化する必要がなくなります。ここでは、コードにスキーマ定義を追加せずに option2 を実装します。

ジャミン

于 2013-10-02T17:12:56.703 に答える