4

私はSparkとGraphXを初めて使用し、接続されたコンポーネントを見つけるためにそのアルゴリズムでいくつかの実験を行いました。グラフの構造がパフォーマンスに強い影響を与えているように見えることに気付きました。

数百万の頂点とエッジを持つグラフを計算できましたが、グラフの特定のグループについて、アルゴリズムは時間内に終了せず、最終的にOutOfMemoryError: GC overhead limit exceeded.

このアルゴリズムは、長いパスを含むグラフに問題があるようです。たとえば、このグラフ{ (i,i+1) | i <- {1..200} }の場合、計算は失敗します。ただし、推移的なエッジを追加すると、計算はすぐに終了しました。

{ (i,j) | i <- {1..200}, j <- {i+1,200} }

また、このようなグラフは問題ありませんでした:

{ (i,1) | i <- {1..200} }

問題を再現するための最小限の例を次に示します。

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._
import org.apache.spark.storage.StorageLevel

import scala.collection.mutable

object Matching extends Logging {

  def main(args: Array[String]): Unit = {
    val fname = "input.graph"
    val optionsList = args.drop(1).map { arg =>
      arg.dropWhile(_ == '-').split('=') match {
        case Array(opt, v) => opt -> v
        case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
      }
    }
    val options = mutable.Map(optionsList: _*)

    val conf = new SparkConf()
    GraphXUtils.registerKryoClasses(conf)

    val partitionStrategy: Option[PartitionStrategy] = options.remove("partStrategy")
      .map(PartitionStrategy.fromString(_))
    val edgeStorageLevel = options.remove("edgeStorageLevel")
      .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)
    val vertexStorageLevel = options.remove("vertexStorageLevel")
      .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)

    val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")"))
    val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
      edgeStorageLevel = edgeStorageLevel,
      vertexStorageLevel = vertexStorageLevel).cache()
    log.info("Loading graph...")
    val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
    log.info("Loading graph...done")

    log.info("Computing connected components...")
    val cc = ConnectedComponents.run(graph)
    log.info("Computed connected components...done")

    sc.stop()
  }
}

ファイルは次のinput.graphようになります (10 個のノード、それらを接続する 9 個のエッジ):

1 2
2 3
3 4
4 5
5 6
6 7
7 8
8 9
9 10

失敗するとハングしConnectedComponents.run(graph)ます。エラーメッセージは次のとおりです。

Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.regex.Pattern.compile(Pattern.java:1054)
    at java.lang.String.replace(String.java:2239)
    at org.apache.spark.util.Utils$.getFormattedClassName(Utils.scala:1632)
    at org.apache.spark.storage.RDDInfo$$anonfun$1.apply(RDDInfo.scala:58)
    at org.apache.spark.storage.RDDInfo$$anonfun$1.apply(RDDInfo.scala:58)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.storage.RDDInfo$.fromRdd(RDDInfo.scala:58)
    at org.apache.spark.scheduler.StageInfo$$anonfun$1.apply(StageInfo.scala:80)
    at org.apache.spark.scheduler.StageInfo$$anonfun$1.apply(StageInfo.scala:80)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.spark.scheduler.StageInfo$.fromStage(StageInfo.scala:80)
    at org.apache.spark.scheduler.Stage.<init>(Stage.scala:99)
    at org.apache.spark.scheduler.ShuffleMapStage.<init>(ShuffleMapStage.scala:44)
    at org.apache.spark.scheduler.DAGScheduler.newShuffleMapStage(DAGScheduler.scala:317)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$newOrUsedShuffleStage(DAGScheduler.scala:352)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getShuffleMapStage$1.apply(DAGScheduler.scala:286)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getShuffleMapStage$1.apply(DAGScheduler.scala:285)
    at scala.collection.Iterator$class.foreach(Iterator.scala:742)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.mutable.Stack.foreach(Stack.scala:170)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:285)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:389)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:386)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:386)
    at org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:398)

ローカルの Spark ノードを実行しており、次のオプションを使用して JVM を開始しています。

-Dspark.master=local -Dspark.local.dir=/home/phil/tmp/spark-tmp -Xms8g -Xmx8g

このおもちゃのグラフ (201 ノードと 200 エッジ) で問題が発生する理由を理解するのを手伝ってもらえますか? 一方、数百万のエッジを持つ現実的なグラフを約 80 秒で解決できますか? (どちらの例でも、同じセットアップと構成を使用しています。)

アップデート:

スパークシェルでも再現可能:

import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._

val graph = GraphLoader.edgeListFile(sc, "input.graph").cache()
ConnectedComponents.run(graph)

バグ レポートを作成しました: SPARK-15042

4

1 に答える 1

0

SPARK-15042によると、問題は 2.1.0-snapshot にまだ存在します。

バグ修正の進捗状況はSPARK-5484で確認できます。

于 2016-10-22T13:26:57.147 に答える