10

Spark Streaming を使用して Cassandra から読み取るときに問題が発生します。

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#reading-from-cassandra-from-the-streamingcontext

上記のリンクのように、私は使用します

val rdd = ssc.cassandraTable("streaming_test", "key_value").select("key", "value").where("fu = ?", 3)

カサンドラからデータを選択しますが、スパークストリーミングには1回のクエリしかないようですが、間隔10秒を使用してクエリを続行したいです。

私のコードは次のとおりです。あなたの応答を願っています。

ありがとう!

import org.apache.spark._
import org.apache.spark.streaming._
import com.datastax.spark.connector.streaming._
import org.apache.spark.rdd._
import scala.collection.mutable.Queue


object SimpleApp {
def main(args: Array[String]){
    val conf = new SparkConf().setAppName("scala_streaming_test").set("spark.cassandra.connection.host", "127.0.0.1")

    val ssc = new StreamingContext(conf, Seconds(10))

    val rdd = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")

    //rdd.collect().foreach(println)

    val rddQueue = new Queue[RDD[com.datastax.spark.connector.CassandraRow]]()


    val dstream = ssc.queueStream(rddQueue)

    dstream.print()

    ssc.start()
    rdd.collect().foreach(println)
    rddQueue += rdd
    ssc.awaitTermination()
}  

}

4

2 に答える 2

9

CassandraRDD を入力として、ConstantInputDStream を作成できます。ConstantInputDStream は、各ストリーミング間隔で同じ RDD を提供し、その RDD でアクションを実行することにより、RDD 系統の具体化をトリガーし、毎回 Cassandra でクエリを実行することにつながります。

クエリ時間が長くなり、ストリーミング プロセスが不安定になるのを避けるために、クエリ対象のデータが無限に大きくならないようにしてください。

このような何かがうまくいくはずです(コードを出発点として使用します):

import org.apache.spark.streaming.dstream.ConstantInputDStream

val ssc = new StreamingContext(conf, Seconds(10))

val cassandraRDD = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")

val dstream = new ConstantInputDStream(ssc, cassandraRDD)

dstream.foreachRDD{ rdd => 
    // any action will trigger the underlying cassandra query, using collect to have a simple output
    println(rdd.collect.mkString("\n")) 
}
ssc.start()
ssc.awaitTermination()
于 2015-09-09T12:56:40.573 に答える