graphx を使用して、spark のグラフ ネットワークに mapReduceTriplets を適用する際に問題が発生しています。
私はチュートリアルに従い、[Array[String],Int] としてまとめられた自分のデータを読み込んでいます。たとえば、私の頂点は次のとおりです。
org.apache.spark.graphx.VertexRDD[Array[String]]
例: (3999,Array(17, Low, 9))
そして、私のエッジは次のとおりです。
org.apache.spark.graphx.EdgeRDD[Int]
例: Edge(3999,4500,1)
頂点の配列 (上記の例 9) の最後の整数のいくつが最初の整数 (上記の例 17) と同じか異なるかをカウントする mapReduceTriplets を使用して、集約型関数を適用しようとしています。接続されたすべての頂点。
したがって、一致または不一致の数のカウントのリストになります。
私が抱えている問題は、mapReduceTriplets を使用して任意の関数を適用することです。私は scala にまったく慣れていないので、これは本当に明白かもしれませんが、graphx チュートリアルには、Graph[Double, Int] 形式のグラフを使用する例がありますが、私のグラフは Graph[Array[String],Int] の形式なので、最初のステップとして、例でグラフを使用してそこから作業する方法を見つけようとしています。
Graphx Web サイトの例は次のとおりです。
val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Double)](
triplet => { // Map Function
if (triplet.srcAttr > triplet.dstAttr) {
// Send message to destination vertex containing counter and age
Iterator((triplet.dstId, (1, triplet.srcAttr)))
} else {
// Don't send a message for this triplet
Iterator.empty
}
},
// Add counter and age
(a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
何かアドバイスをいただければ幸いです。または、mapreducetriplets を使用するよりも良い方法があると思われる場合は、喜んでお聞かせください。
編集された新しいコード
val nodes = (sc.textFile("C~nodeData.csv")
.map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
val edges = GraphLoader.edgeListFile(sc, "C:~edges.txt")
val graph = edges.outerJoinVertices(nodes) {
case (uid, deg, Some(attrList)) => attrList
case (uid, deg, None) => Array.empty[String]
}
val countsRdd = graph.collectNeighbors(EdgeDirection.Either).leftOuterJoin(graph.vertices).map {
case (id, t) => {
val neighbors: Array[(VertexId, Array[String])] = t._1
val nodeAttr = (t._2)
neighbors.map(_._2).count( x => x.apply(x.size - 1) == nodeAttr(0))
}
}