11

これは、Spark Streaming で単純な SQL クエリを実行するためのコードです。

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.Duration

object StreamingSQL {

  case class Persons(name: String, age: Int)

  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount")
    val sc = new SparkContext(sparkConf)
    // Create the context
    val ssc = new StreamingContext(sc, Seconds(2))

    val lines = ssc.textFileStream("C:/Users/pravesh.jain/Desktop/people/")
    lines.foreachRDD(rdd=>rdd.foreach(println))

    val sqc = new SQLContext(sc);
    import sqc.createSchemaRDD

    // Create the FileInputDStream on the directory and use the
    // stream to count words in new files created

    lines.foreachRDD(rdd=>{
      rdd.map(_.split(",")).map(p => Persons(p(0), p(1).trim.toInt)).registerAsTable("data")
      val teenagers = sqc.sql("SELECT name FROM data WHERE age >= 13 AND age <= 19")
      teenagers.foreach(println)
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

ご覧のとおり、ストリーミングで SQL を実行するには、foreachRDD メソッド内でクエリを作成する必要があります。2 つの異なるストリームから受信したデータに対して SQL 結合を実行したいと考えています。できる方法はありますか?

4

2 に答える 2

8

さて、議論の末に辿り着いた回避策をSpiroの回答にまとめたいと思います。最初に空のテーブルを作成し、次にそこに RDD を挿入するという彼の提案は見事でした。唯一の問題は、Spark がまだテーブルへの挿入を許可していないことです。できることは次のとおりです。

まず、ストリームから期待するスキーマと同じスキーマを持つ RDD を作成します。

import sqlContext.createSchemaRDD
val d1=sc.parallelize(Array(("a",10),("b",3))).map(e=>Rec(e._1,e._2))

次に、Parquet ファイルとして保存します

d1.saveAsParquetFile("/home/p1.parquet")

ここで、parquet ファイルをロードし、registerAsTable()メソッドを使用してテーブルとして登録します。

val parquetFile = sqlContext.parquetFile("/home/p1.parquet")
parquetFile.registerAsTable("data")

ストリームを受信したら、ストリームにforeachRDD()を適用し、 insertInto()メソッドを使用して上記で作成したテーブルに個々の RDD を挿入し続けます。

dStream.foreachRDD(rdd=>{
rdd.insertInto("data")
})

この insertInto() は正常に機能し、データをテーブルに収集できます。これで、任意の数のストリームに対して同じことを行い、クエリを実行できます。

于 2014-09-03T06:36:30.770 に答える