1

SpecificRecordBaseに似た Avro オブジェクトで Bijection を実行する例を探しています。または、クラスを Kafka キーと値のシリアライザーとしてGenericRecordBase使用するより簡単な方法がある場合。AvroSerializer

Injection<GenericRecord, byte[]> genericRecordInjection =
                                        GenericAvroCodecs.toBinary(schema);
byte[] bytes = genericRecordInjection.apply(type);
4

2 に答える 2

1

https://github.com/miguno/kafka-storm-starterは、そのようなサンプル コードを提供します。

たとえば、AvroDecoderBoltを参照してください。そのjavadocから:

このボルトは、Avro スキーマに従ってシリアル化された、Avro でエンコードされたバイナリ形式の受信データを想定していますT。受信データをTpojo にデシリアライズし、この pojo を下流のコンシューマーに送信します。Injection.invert[T, Array[Byte]](bytes)そのため、このボルトは、Avro データの Twitter Bijection に相当する Storm と見なすことができます。

どこ

TTweet:使用されている基礎となる Avro スキーマに基づくAvro レコードのタイプ (例: a )。Avro の のサブクラスである必要がありますSpecificRecordBase

コードの重要な部分は次のとおりです (コードをこのスニペットにまとめました)。

// With T <: SpecificRecordBase

implicit val specificAvroBinaryInjection: Injection[T, Array[Byte]] =
SpecificAvroCodecs.toBinary[T]

val bytes: Array[Byte] = ...; // the Avro-encoded data
val decodeTry: Try[T] = Injection.invert(bytes)
decodeTry match {
  case Success(pojo) =>
    System.out.println("Binary data decoded into pojo: " + pojo)
  case Failure(e) => log.error("Could not decode binary data: " + Throwables.getStackTraceAsString(e))
}
于 2016-08-05T17:29:58.767 に答える