** kafka からデータをストリーミングして、データ フレームに変換しようとしています。このリンクをたどった
しかし、プロデューサー アプリケーションとコンシューマー アプリケーションの両方を実行している場合、これがコンソールの出力です。**
(0,[B@370ed56a) (1,[B@2edd3e63) (2,[B@3ba2944d) (3,[B@2eb669d1) (4,[B@49dd304c) (5,[B@4f6af565) (6 ,[B@7714e29e)
これは文字通り kafka プロデューサーの出力であり、トピックはメッセージをプッシュする前は空です。
プロデューサーのコード スニペットは次のとおりです。
Properties props = new Properties();
props.put("bootstrap.servers", "##########:9092");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("producer.type", "async");
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(EVENT_SCHEMA);
Injection<GenericRecord, byte[]> records = GenericAvroCodecs.toBinary(schema);
KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props);
for (int i = 0; i < 100; i++) {
GenericData.Record avroRecord = new GenericData.Record(schema);
setEventValues(i, avroRecord);
byte[] messages = records.apply(avroRecord);
ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<String, byte[]>(
"topic", String.valueOf(i),messages);
System.out.println(producerRecord);
producer.send(producerRecord);
}
そしてその出力は次のとおりです。
キー=0、値=[B@680387a キー=1、値=[B@32bfb588 キー=2、値=[B@2ac2e1b1 キー=3、値=[B@606f4165 キー=4、値=[B@282e7f59
これは、scala で記述された私のコンシューマー コード スニペットです。
"group.id" -> "KafkaConsumer",
"zookeeper.connection.timeout.ms" -> "1000000"
val topicMaps = Map("topic" -> 1)
val messages = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER)
messages.print()
createStream() で StringDecoder と DefaultDecoder の両方を試してみました。プロデューサーとコンシューマーは互いに準拠していると確信しています。誰からの助けはありますか?