スパーク マップ関数で小さなサブグラフを取得する必要があります。AnormCypher と NEO4J-SPARK-CONNECTOR を使用しようとしましたが、どちらも機能しません。AnormCypher は Java IOException エラーにつながります (mapPartition 関数で接続を構築し、localhost サーバーでテストします)。また、Neo4j-spark-connector は、以下の TASK NOT SERIALIZABLE EXCEPTION を引き起こします。
Sparkワーカーノードでサブグラフを取得する(またはneo4jのようなグラフデータベースに接続する)良い方法はありますか?
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792)
at ....
neo4j-spark-connector 2.0.0-m2 を使用した私のコード スニペット:
val neo = Neo4j(sc) // this runs on the driver
// this runs by a map function
def someFunctionToBeMapped(p: List[Long]) = {
val metaGraph = neo.cypher("match p = (a:TourPlace) -[r:could_go_to] -> (b:TourPlace)" +
"return a.id ,r.distance, b.id").loadRowRdd.map( row => ((row(0).asInstanceOf[Long],row(2).asInstanceOf[Long]), row(1).asInstanceOf[Double]) ).collect().toList
AnromCypher コードは次のとおりです。
def partitionMap(partition: Iterator[List[Long]]) = {
import org.anormcypher._
import play.api.libs.ws._
// Provide an instance of WSClient
val wsclient = ning.NingWSClient()
// Setup the Rest Client
// Need to add the Neo4jConnection type annotation so that the default
// Neo4jConnection -> Neo4jTransaction conversion is in the implicit scope
implicit val connection: Neo4jConnection = Neo4jREST("127.0.0.1", 7474, "neo4j", "000000")(wsclient)
//
// Provide an ExecutionContext
implicit val ec = scala.concurrent.ExecutionContext.global
val res = partition.filter( placeList => {
val startPlace = Cypher("match p = (a:TourPlace) -[r:could_go_to] -> (b:TourPlace)" +
"return p")().flatMap( row => row.data )
})
wsclient.close()
res
}