0

これは私が投稿する最初の問題なので、情報や平凡な書式設定を見逃している場合はお詫びします. 必要に応じて更新できます。

できるだけ多くの詳細を追加しようとします。RDBMSデータをNeo4jのグラフノードと関係に変換する、あまり最適化されていないSparkジョブがあります。

これをする。これが私が従う手順です:

  1. spark sql と結合を使用して、非正規化されたデータフレーム 'data' を作成します。
  2. 「データ」の Foreach 行は、以下を行う graphInsert 関数を実行します。

    を。行の内容を読み取る
    b. c. neo4j サイファー クエリを作成します ( Mergeコマンドを使用して、都市が 1 つだけになるようにします。たとえば、シカゴが RDBMS テーブルに複数の行で表示される場合に、Neo4j でシカゴを作成します)
    c.
    d. neo4j に接続します 。クエリを実行します
    e. neo4j から切断する

これが私が直面している問題のリストです。

  1. 挿入は遅いです。

マージ クエリは作成よりも遅いことはわかっていますが、レコードごとに接続および切断する代わりにこれを行う別の方法はありますか? これは私の最初のドラフト コードであり、1 つの接続を使用して、異なる Spark ワーカー ノード上の複数のスレッドから挿入する方法に苦労している可能性があります。したがって、レコードごとに接続および切断します。

  1. ジョブはスケーラブルではありません。1コアでも問題なく動きます。2 つの Spark コアでジョブを実行するとすぐに、マージ クエリを実行している場合でも、同じ名前の 2 つの都市が突然表示されます。例: Merge の使用に違反しているシカゴの都市が 2 つあります。Merge は「存在しない場合は作成する」のような機能をしていると思います。

私の実装がneo4jの一部またはsparkで間違っているかどうかはわかりません。誰かがこれをより良い規模で実装するのに役立つドキュメントに私を導くことができれば、私はこの仕事のために最大限の可能性を利用する必要がある大きなスパーククラスターを持っているので役に立ちます.

アルゴリズムではなくコードに興味がある場合。以下は、scala での graphInsert の実装です。

class GraphInsert extends Serializable{
   var case_attributes = new Array[String](4)
   var city_attributes = new Array[String](2)
   var location_attributes = new Array[String](20)
   var incident_attributes = new Array[String](20)
   val prop = new Properties()
   prop.load(getClass().getResourceAsStream("/GraphInsertConnection.properties"))
   // properties Neo4j
   val url_neo4j = prop.getProperty("url_neo4j")
   val neo4j_user = prop.getProperty("neo4j_user")
   val neo4j_password = prop.getProperty("neo4j_password")


   def graphInsert(data : Row){  
      val query = "MERGE (d:CITY {name:city_attributes(0)})\n" +"MERGE (a:CASE { " + case_attributes(0)  + ":'" +data(11) + "'," +case_attributes(1)  + ":'" +data(13)  + "'," +case_attributes(2)  + ":'" +data(14) +"'}) \n" +"MERGE (b:INCIDENT { " + incident_attributes(0)  + ":" +data(0) + "," +incident_attributes(1)  + ":" +data(2)  + "," +incident_attributes(2)  + ":'" +data(3) +  "'," +incident_attributes(3)  + ":'" +data(8)+  "'," +incident_attributes(4)  + ":" +data(5) +  "," +incident_attributes(5)  + ":'" +data(4) +  "'," +incident_attributes(6)  + ":'" +data(6) +  "'," +incident_attributes(7)  + ":'" +data(1) +  "'," +incident_attributes(8)  + ":" +data(7)+"}) \n" +"MERGE (c:LOCATION { " + location_attributes(0)  + ":" +data(9) + "," +location_attributes(1)  + ":" +data(10)  + "," +location_attributes(2)  + ":'" +data(19) +  "'," +location_attributes(3)  + ":'" +data(20)+  "'," +location_attributes(4)  + ":" +data(18) +  "," +location_attributes(5)  + ":" +data(21) +  "," +location_attributes(6)  + ":'" +data(17) +  "'," +location_attributes(7)  + ":" +data(22) +  "," +location_attributes(8)  + ":" +data(23)+"}) \n" +"MERGE (a) - [r1:"+relation_case_incident+"]->(b)-[r2:"+relation_incident_location+"]->(c)-[r3:belongs_to]->(d);"
              println(query)
              try{
                      var con = DriverManager.getConnection(url_neo4j, neo4j_user, neo4j_password)
                          var stmt = con.createStatement()
                          var rs = stmt.executeQuery(query)
                          con.close()
              }catch{
              case ex: SQLException =>{
                  println(ex.getMessage)
              }
              }
  } 

def operations(sqlContext: SQLContext){
    ....
    #Get 'data' before this step
    city_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_city").map(x =>x.getString(5)).collect()
    case_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_case_number").map(x =>x.getString(5)).collect()
    location_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_location").map(x =>x.getString(5)).collect()
    incident_attributes= entity_metadata.filter(entity_metadata("source_name") === "tb_incident").map(x =>x.getString(5)).collect()

    data.foreach(graphInsert)

}

object GraphObject {
  def main(args: Array[String]) {  
      val conf = new SparkConf()
        .setAppName("GraphNeo4j")
        .setMaster("xyz")
        .set("spark.cores.max","2")
        .set("spark.executor.memory","10g")

      Logger.getLogger("org").setLevel(Level.ERROR)
      Logger.getLogger("akka").setLevel(Level.ERROR)
      val sc = new SparkContext(conf)
      val sqlContext = new SQLContext(sc)
      val graph = new GraphInsert()
      graph.operations(sqlContext)

  }
}
4

2 に答える 2

0

クロージャー内に記述したもの、つまり Worker で実行する必要があるものはすべて配布されます。詳細については、こちらをご覧ください: http://spark.apache.org/docs/latest/programming-guide.html#understanding-closures-a-nameclosureslinka

また、コア数を増やしてもアプリケーションに影響を与えてはいけないと思います。それからそれは貪欲なアプローチを取ります! このドキュメントがお役に立てば幸いです。

于 2016-06-23T18:14:03.087 に答える