1

私はかなり新しい Spark Streaming

2 つの値 x y を含むストリーミング データがあります。例えば

1 300

2 8754

3 287

ストリーミングされたデータから、最小の y 値、最大の y 値、および x 値の平均を取得したいと考えています。これは次のように出力する必要があります (上記の例を使用)。

287 8754 4

個々の変換/縮小でこれらの値を計算できましたが、単一の変換ではできませんでした

これが私の現在のコードです

val transformedStream = windowStream.map(line => {
  Array(line.split(" ")(0).toLong, line.split(" ")(1).toLong)

val smallest: DStream[Double]  = transformedStream.reduce((a,b) => {
  Array(0, math.min(a(1), b(1)))
}).map(u => u(1).toDouble)

val biggest  = transformedStream.reduce((a,b) => {
  Array(0, math.max(a(1), b(1)))
}).map(u => u(1).toDouble)

val mean = transformedStream.reduce((a, b) => Array( (a(0) + b(0))/2 )).
  map(u => u(0).toDouble)
4

1 に答える 1

2

これを試して:

val spark: SparkSession = ???
import spark.implicits._

windowStream.transofrm( rdd => {
  val xy = rdd.map(_.split(" ")).collect {
    case Array(x, y) => (x.toLong, y.toLong)
  }
  xy.toDF("x", "y").agg(min("y"), max("y"), mean("x"))
  .as[(Long, Long, Double)].rdd
})

重要:

transformedStream.reduce((a, b) => Array( (a(0) + b(0))/2 )  

の平均を計算しませんx

于 2016-11-07T01:25:43.210 に答える