4

zeppelin ノートブックで Spark ストリーミングによって消費される kafka メッセージを保存する際に問題があります。

私のコードは次のとおりです。

case class Message(id: Long, message: String, timestamp: Long) extends Serializable

   val ssc = new StreamingContext(sc, Seconds(2))

  val messagesStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, 
    Map("zookeeper.connect" -> "localhost:2181", "group.id" -> "test-consumer-group"),
    Map("test" -> 4),
    StorageLevel.MEMORY_ONLY)
    .map { case (k, v) =>  implicit val formats = DefaultFormats; JsonMethods.parse(v).extract[Message] }
    .filter(_.id % 2 == 0)

  val mes =  messagesStream.window(Seconds(10))

  mes
  .map(m => Message(m.id, m.message, m.timestamp))
  .foreachRDD( rdd => rdd.toDF.registerTempTable("messages"))

  ssc.start() 

実行する%sql select * from messagesとデータは表示されませんが、テーブルは定義されています。Cassandraで保存をtempTableに変更すると、データが正しく保存されて表示されます。なぜそうなのか理解できません。

手伝ってくれてありがとう。

4

2 に答える 2

0

別のクラスターを回避したい場合: 別の解決策は、rdd を行に変換してから df に変換し、それを寄木細工または orc として hdfs に保存し、ファイル ex を追加するオプションを使用することです。

write.format("orc").mode("overwrite").partitionBy("id").save("/tmp/data")

AWS ブロガーが一時テーブル re [ここにリンクの説明を入力][1] で直接分析を実行できたのはなぜだろうかと思っています。

良いことは、構造化ストリーミングが間もなく登場することです:)

[1]:aws ブログ: https://blogs.aws.amazon.com/bigdata/post/Tx3K805CZ8WFBRP/Analyze-Realtime-Data-from-Amazon-Kinesis-Streams-Using-Zeppelin-and-Spark-Stream

于 2016-08-15T20:25:15.147 に答える