zeppelin ノートブックで Spark ストリーミングによって消費される kafka メッセージを保存する際に問題があります。
私のコードは次のとおりです。
case class Message(id: Long, message: String, timestamp: Long) extends Serializable
val ssc = new StreamingContext(sc, Seconds(2))
val messagesStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc,
Map("zookeeper.connect" -> "localhost:2181", "group.id" -> "test-consumer-group"),
Map("test" -> 4),
StorageLevel.MEMORY_ONLY)
.map { case (k, v) => implicit val formats = DefaultFormats; JsonMethods.parse(v).extract[Message] }
.filter(_.id % 2 == 0)
val mes = messagesStream.window(Seconds(10))
mes
.map(m => Message(m.id, m.message, m.timestamp))
.foreachRDD( rdd => rdd.toDF.registerTempTable("messages"))
ssc.start()
実行する%sql select * from messages
とデータは表示されませんが、テーブルは定義されています。Cassandraで保存をtempTableに変更すると、データが正しく保存されて表示されます。なぜそうなのか理解できません。
手伝ってくれてありがとう。