1

で SQL over Spark Streaming を使用する例がいくつかありますforeachRDD()。しかし、SQL を使用したい場合tranform():

case class AlertMsg(host:String, count:Int, sum:Double)
val lines = ssc.socketTextStream("localhost", 8888)
lines.transform( rdd => {
  if (rdd.count > 0) {
    val t = sqc.jsonRDD(rdd)
    t.registerTempTable("logstash")
    val sqlreport = sqc.sql("SELECT host, COUNT(host) AS host_c, AVG(lineno) AS line_a FROM logstash WHERE path = '/var/log/system.log' AND lineno > 70 GROUP BY host ORDER BY host_c DESC LIMIT 100")
    sqlreport.map(r => AlertMsg(r(0).toString,r(1).toString.toInt,r(2).toString.toDouble))
  } else {
    rdd
  }
}).print()

私はそのようなエラーを得ました:

[エラー] /Users/raochenlin/Downloads/spark-1.2.0-bin-hadoop2.4/logstash/src/main/scala/LogStash.scala:52: メソッド transform の型パラメーターがありません: (transformFunc: org.apache. spark.rdd.RDD[String] => org.apache.spark.rdd.RDD[U])(暗黙の証拠$5: scala.reflect.ClassTag[U])org.apache.spark.streaming.dstream.DStream[U ] が存在するため、引数に適用できます (org.apache.spark.rdd.RDD[String] => org.apache.spark.rdd.RDD[_ >: LogStash.AlertMsg with String <: java.io.Serializable ]) [エラー] --- 理由 --- [エラー] 引数式の型が仮パラメータ型と互換性がありません。[エラー] が見つかりました: org.apache.spark.rdd.RDD[文字列] => org.apache.spark.rdd.RDD[_ >: 文字列 <: java.io.Serializable] を持つ LogStash.AlertMsg [エラー] が必要です: org.apache.spark.rdd.RDD[文字列] => org.apache.spark.rdd.

私が使用する場合にのみsqlreport.map(r => r.toString)、正しい使用法になる可能性がありますか?

4

1 に答える 1

0

dstream.transformtransformFunc: (RDD[T]) ⇒ RDD[U] この場合、if条件の両方の評価で同じ型になる必要がありますが、そうではありません。

if (count == 0) => RDD[String]
if (count > 0) => RDD[AlertMsg]

この場合、if rdd.count ...固有の変換パスが得られるように、の最適化を削除します。

于 2015-02-15T21:34:59.220 に答える