https://github.com/miguno/kafka-storm-starterは、そのようなサンプル コードを提供します。
たとえば、AvroDecoderBoltを参照してください。そのjavadocから:
このボルトは、Avro スキーマに従ってシリアル化された、Avro でエンコードされたバイナリ形式の受信データを想定していますT
。受信データをT
pojo にデシリアライズし、この pojo を下流のコンシューマーに送信します。Injection.invert[T, Array[Byte]](bytes)
そのため、このボルトは、Avro データの Twitter Bijection に相当する Storm と見なすことができます。
どこ
T
Tweet
:使用されている基礎となる Avro スキーマに基づくAvro レコードのタイプ (例: a )。Avro の のサブクラスである必要がありますSpecificRecordBase
。
コードの重要な部分は次のとおりです (コードをこのスニペットにまとめました)。
// With T <: SpecificRecordBase
implicit val specificAvroBinaryInjection: Injection[T, Array[Byte]] =
SpecificAvroCodecs.toBinary[T]
val bytes: Array[Byte] = ...; // the Avro-encoded data
val decodeTry: Try[T] = Injection.invert(bytes)
decodeTry match {
case Success(pojo) =>
System.out.println("Binary data decoded into pojo: " + pojo)
case Failure(e) => log.error("Could not decode binary data: " + Throwables.getStackTraceAsString(e))
}