Spark Streaming を使用して Cassandra から読み取るときに問題が発生します。
上記のリンクのように、私は使用します
val rdd = ssc.cassandraTable("streaming_test", "key_value").select("key", "value").where("fu = ?", 3)
カサンドラからデータを選択しますが、スパークストリーミングには1回のクエリしかないようですが、間隔10秒を使用してクエリを続行したいです。
私のコードは次のとおりです。あなたの応答を願っています。
ありがとう!
import org.apache.spark._
import org.apache.spark.streaming._
import com.datastax.spark.connector.streaming._
import org.apache.spark.rdd._
import scala.collection.mutable.Queue
object SimpleApp {
def main(args: Array[String]){
val conf = new SparkConf().setAppName("scala_streaming_test").set("spark.cassandra.connection.host", "127.0.0.1")
val ssc = new StreamingContext(conf, Seconds(10))
val rdd = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")
//rdd.collect().foreach(println)
val rddQueue = new Queue[RDD[com.datastax.spark.connector.CassandraRow]]()
val dstream = ssc.queueStream(rddQueue)
dstream.print()
ssc.start()
rdd.collect().foreach(println)
rddQueue += rdd
ssc.awaitTermination()
}
}