嵐のトポロジを書きました。私は基本的に avro スキーマのタプルをバイト配列の形式で kafka トピックに送信したいと考えています。
これは私がボルトを設定する方法です:
builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, byte[]>())
.fieldsGrouping(BOLT1, new Fields("key"));
そして、これは私がバイト配列に変換する方法です
Schema schema = avroObject.getSchema();
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(ping, encoder);
encoder.flush();
byte[] message = out.toByteArray();
String key = new String(message, "UTF-8");
次の方法でタプルを発行すると、kafka トピックに何も表示されません (バイトストリームを kafka に送信します):
collector.emit(tuple, new Values(Obj.hashMD5(key), message));
しかし、代わりに、バイト配列を文字列に変換してからkafkaトピックに変換すると、機能します:
以下のようなもの:
builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, String>())
.fieldsGrouping(BOLT1, new Fields("key"));
collector.emit(tuple, new Values(Obj.hashMD5(key), key));
私は何を間違っていますか?ストームカフカボルトを使用してバイトストリームをカフカトピックに送信するにはどうすればよいですか?