0

次のデータ サンプルを使用して、Spark で Zeppelin をテストしています。

import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

val vertexArray = Array(
(1L, ("Alice", 28)),
(2L, ("Bob", 27)),
(3L, ("Charlie", 65)),
(4L, ("David", 42)),
(5L, ("Ed", 55)),
(6L, ("Fran", 50))
)
val edgeArray = Array(
Edge(2L, 1L, 7),
Edge(2L, 4L, 2),
Edge(3L, 2L, 4),
Edge(3L, 6L, 3),
Edge(4L, 1L, 1),
Edge(5L, 2L, 2),
Edge(5L, 3L, 8),
Edge(5L, 6L, 3)
)

val vertexRDD: RDD[(Long, (String, Int))] = sc.parallelize(vertexArray)
val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)
val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)

Spark-Shell でコードが正常に動作していても、Zeppelin が常に出力を表示できるとは限らないことに気付きました。以下は例ですが、これを修正する方法はありますか?

graph.vertices.filter { case (id, (name, age)) => age > 30 }.foreach {
case (id, (name, age)) => println(s"$name is $age")
}
4

1 に答える 1

3

ここで修正することは本当に何もありません。これは単に予期される動作です。クロージャー内のコードforeachは、ノートブックが実行されているドライバーではなくワーカーで実行されます。その出力は、構成に応じてキャプチャできますが、依存できるものではありません。

ドライバー プログラムから出力したい場合、最適なオプションは、ローカルでcollect変換またはtoLocalIterator反復することです。

graph.vertices.filter { case (id, (name, age)) => age > 30 }.collect.foreach {
  case (id, (name, age)) => println(s"$name is $age")
}
于 2015-12-03T15:08:02.853 に答える