1

のソースを見るとouterJoinVertices

これはバグなのか仕様なのか気になる

override def outerJoinVertices[U: ClassTag, VD2: ClassTag]
      (other: RDD[(VertexId, U)])
      (updateF: (VertexId, VD, Option[U]) => VD2)
      (implicit eq: VD =:= VD2 = null): Graph[VD2, ED] = {
    // The implicit parameter eq will be populated by the compiler if VD and VD2 are equal, and left
    // null if not
    if (eq != null) {
      vertices.cache() // <===== what if I wanted it serialized? 
      // updateF preserves type, so we can use incremental replication
      val newVerts = vertices.leftJoin(other)(updateF).cache()
      val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
      val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]]
        .updateVertices(changedVerts)
      new GraphImpl(newVerts, newReplicatedVertexView)
    } else {
      // updateF does not preserve type, so we must re-replicate all vertices
      val newVerts = vertices.leftJoin(other)(updateF)
      GraphImpl(newVerts, replicatedVertexView.edges)
    }
  }

質問

  1. グラフ/結合された頂点が既に別のものを介してキャッシュされている場合StorageLevel(例: MEMORY_ONLY_SER) - これが原因org.apache.spark.graphx.impl.ShippableVertexPartitionOps ... WARN ShippableVertexPartitionOps: Joining two VertexPartitions with different indexes is slow.ですか?

  2. この場合、これは Spark のバグですか (これは 1.3.1 からのものです)? もしそうなら、これに関するJIRAの問題を見つけることができませんでした(しかし、私はあまりよく見ていませんでした...)

  3. このメソッドに新しい StorageLevel を提供するほど簡単に修正できないのはなぜですか?

  4. この問題の回避策は何ですか? (私が考えることができるのは、vertices.join(otherVertices)とoriginalGraph.edgesなどで新しいグラフを作成することです...しかし、それは間違っていると感じています...

4

1 に答える 1

1

まあ、それは実際にはバグではないと思います。

そのコードを見るとVertexRDD、キャッシュ メソッドがオーバーライドStorageLevelされ、この頂点の作成に使用されたオリジナルが使用されます。

  override def cache(): this.type = {
    partitionsRDD.persist(targetStorageLevel)
    this
  }
于 2015-05-14T18:49:20.683 に答える