以下のコードを使用して、Kafka からのメッセージを読み取ることができます。
val ssc = new StreamingContext(sc, Seconds(50))
val topicmap = Map("test" -> 1)
val lines = KafkaUtils.createStream(ssc,"127.0.0.1:2181", "test-consumer-group",topicmap)
しかし、私は Kafka からの各メッセージを読み取って HBase に入れようとしています。これは HBase に書き込むコードですが、成功しません。
lines.foreachRDD(rdd => {
rdd.foreach(record => {
val i = +1
val hConf = new HBaseConfiguration()
val hTable = new HTable(hConf, "test")
val thePut = new Put(Bytes.toBytes(i))
thePut.add(Bytes.toBytes("cf"), Bytes.toBytes("a"), Bytes.toBytes(record))
})
})