0

と の 2 つの RDD がpointsありpointsWithinEpsます。の各点は座標をpoints表します。2 点とそれらの間の距離を表します: . すべてのポイントをループし、すべてのポイントに対して (最初の) 座標にある要素のみをフィルター処理したいと考えています。だから私は次のようにします:x, ypointsWithinEps((x, y), distance)pointsWithinEpsx

    points.foreach(p =>
      val distances = pointsWithinEps.filter{
        case((x, y), distance) => x == p
      }
      if (distances.count() > 3) {
//        do some other actions
      }
    )

しかし、この構文は無効です。私の知る限り、Spark foreach 内で変数を作成することは許可されていません。私はこのようなことをすべきですか?

for (i <- 0 to points.count().toInt) {
  val p = points.take(i + 1).drop(i) // take the point
  val distances = pointsWithinEps.filter{
    case((x, y), distance) => x == p
  }
  if (distances.count() > 3) {
    //        do some other actions
  }
}

または、これを行うより良い方法はありますか?完全なコードはここでホストされています: https://github.com/timasjov/spark-learning/blob/master/src/DBSCAN.scala

編集:

points.foreach({ p =>
  val pointNeighbours = pointsWithinEps.filter {
    case ((x, y), distance) => x == p
  }
  println(pointNeighbours)
})

現在、次のコードがありますが、NullPointerException (pointsWithinEps) がスローされます。なぜそれを修正することができますかpointsWithinEps? (foreach に要素が含まれる前に) なぜ null なのですか?

4

2 に答える 2

2

特定の座標から始まるすべての距離ポイントを収集するために、次のように、その座標でポイントをキー設定xし、そのキーでグループ化する簡単な分散方法があります。

val pointsWithinEpsByX = pointsWithinEps.map{case ((x,y),distance) => (x,((x,y),distance))}
val xCoordinatesWithDistance = pointsWithinEpsByX.groupByKey

次に、点の RDD を前の変換の結果と左結合します。

val pointsWithCoordinatesWithDistance = points.leftOuterJoin(xCoordinatesWithDistance)
于 2014-10-26T20:47:45.867 に答える