2

以下のコードを使用して、Kafka からのメッセージを読み取ることができます。

val ssc = new StreamingContext(sc, Seconds(50)) 
val topicmap = Map("test" -> 1)
val lines = KafkaUtils.createStream(ssc,"127.0.0.1:2181", "test-consumer-group",topicmap)

しかし、私は Kafka からの各メッセージを読み取って HBase に入れようとしています。これは HBase に書き込むコードですが、成功しません。

lines.foreachRDD(rdd => {
  rdd.foreach(record => {
    val i = +1
    val hConf = new HBaseConfiguration() 
    val hTable = new HTable(hConf, "test") 
    val thePut = new Put(Bytes.toBytes(i)) 
    thePut.add(Bytes.toBytes("cf"), Bytes.toBytes("a"), Bytes.toBytes(record)) 
  })
})
4

2 に答える 2

5

Put を実際に実行しているのではなく、Put リクエストを作成してデータを追加しているだけです。あなたが欠けているのは

hTable.put(thePut);
于 2014-12-02T10:05:10.160 に答える