1

Gobblin ( https://github.com/linkedin/gobblin/wiki )と呼ばれるテクノロジによってそこに置かれた HDFS からAVRO レコード ( https://avro.apache.org/ )を読み取る Java Spark アプリケーションを開発しようとしています。

サンプルの HDFS AVRO データ ファイル:

/gobblin/work/job-output/KAFKA/kafka-gobblin-hdfs-test/20150910213846_append/part.task_kafka-gobblin-hdfs-test_1441921123461_0.avro

残念ながら、Java で書かれた例は限られていることがわかりました。

私が見つけた最高のものは、Scala で書かれています (Hadoop バージョン 1 ライブラリを使用)。

どんな助けでも大歓迎です。

現在、次のコードを使用することを考えていますが、AVRO データから値の HashMap を抽出する方法がわかりません。

JavaPairRDD avroRDD = sc.newAPIHadoopFile( 
    path, 
    AvroKeyInputFormat.class, 
    AvroKey.class, 
    NullWritable.class, 
    new Configuration() );

// JavaPairRDD avroRDD = sc.newAPIHadoopFile( 
//    path, 
//    AvroKeyValueInputFormat.class, 
//    AvroKey.class, 
//    AvroValue.class, 
//    new Configuration() );

私の現在の Maven の依存関係:

<dependencies>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.3.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>1.7.6</version>
    </dependency>
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-mapred</artifactId>
        <version>1.7.6</version>
        <classifier>hadoop2</classifier>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-annotations</artifactId>
      <version>2.4.3</version>
    </dependency>


    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <scope>test</scope>
    </dependency>

</dependencies>
4

1 に答える 1

2

サンプルの Gobblin Avro レコードを入力として読み取り、Spark を使用して関連する結果を出力できる小さなプロトタイプを作成しました ( spark-hdfs-avro-test )。対処しなければならない問題がいくつかあったことは言及しておく価値があります。 コメントやフィードバックをいただければ幸いです。

問題 1: 現在の Avro リリース (1.7.7) と Java シリアル化には問題があります。

引用するには:

Spark は、オブジェクトをシリアル化するために Java の Serializable インターフェイスに依存しています。Avro オブジェクトは Serializable を実装していません。したがって、Spark で Avro オブジェクトを操作するには、Avro が生成したクラスをサブクラス化し、Serializable を実装する必要があります (例: https://github.com/massie/spark-parquet-example/blob/master/src/main/scala/com )。 /zenfractal/SerializableAminoAcid.java .

これに対処するために、私は独自の Serializable ラッパー クラスを作成しました。

問題 2: Avro メッセージに「キー」値が含まれていません。

残念ながら、すぐに使える入力形式を使用することができず、独自の入力形式を作成する必要がありました: AvroValueInputFormat

public class AvroValueInputFormat<T> extends FileInputFormat<NullWritable, AvroValue<T>> {

以下は使えませんでした。

# org.apache.avro.mapreduce.AvroKeyInputFormat
public class AvroKeyInputFormat<T> extends FileInputFormat<AvroKey<T>, NullWritable> {

# org.apache.avro.mapreduce.AvroKeyValueInputFormat
public class AvroKeyValueInputFormat<K, V> extends FileInputFormat<AvroKey<K>, AvroValue<V>> {

問題 3: AvroJob クラス セッターを使用してスキーマ値を設定できず、これを手動で行う必要がありました。

    hadoopConf.set( "avro.schema.input.key", Schema.create( org.apache.avro.Schema.Type.NULL ).toString() ); //$NON-NLS-1$
    hadoopConf.set( "avro.schema.input.value", Event.SCHEMA$.toString() ); //$NON-NLS-1$
    hadoopConf.set( "avro.schema.output.key", Schema.create( org.apache.avro.Schema.Type.NULL ).toString() ); //$NON-NLS-1$
    hadoopConf.set( "avro.schema.output.value", SeverityEventCount.SCHEMA$.toString() ); //$NON-NLS-1$
于 2015-10-05T21:20:58.810 に答える