3

この質問は、Spark GraphX に関するものです。特定の他のノードの隣接ノードを削除して、サブグラフを計算したいと考えています。

[タスク] C2 ノードの隣接ノードではない A ノードと B ノードを保持します。

入力グラフ:

                    ┌────┐
              ┌─────│ A  │──────┐
              │     └────┘      │
              v                 v
┌────┐     ┌────┐            ┌────┐     ┌────┐
│ C1 │────>│ B  │            │ B  │<────│ C2 │
└────┘     └────┘            └────┘     └────┘
              ^                 ^
              │     ┌────┐      │
              └─────│ A  │──────┘
                    └────┘

出力グラフ:

         ┌────┐
   ┌─────│ A  │
   │     └────┘
   v           
┌────┐         
│ B  │         
└────┘         
   ^           
   │     ┌────┐
   └─────│ A  │
         └────┘

出力グラフを返すGraphXクエリをエレガントに書く方法は?

4

3 に答える 3

3

val nodesABを使用して見つける別の方法GraphOps.collectNeighbors

val nodesAB = graph.collectNeighbors(EdgeDirection.Either)
  .filter{case (vid,ns) => ! ns.map(_._2).contains("C2")}.map(_._1)
  .intersection(
    graph.vertices
      .filter{case (vid,attr) => ! attr.toString.startsWith("C") }.map(_._1)
  )

残りはあなたが持っていたのと同じように動作します:

val solution1 = Graph(nodesAB, graph.edges) .
subgraph(vpred = {case(id, label) => label != null})

(?) よりスケーラブルな DataFrame を使用する場合は、最初に nodesAB を DataFrame に変換する必要があります。

val newNodes = sqlContext.createDataFrame(
  nodesAB,
  StructType(Array(StructField("newNode", LongType, false)))
)

そして、これを使用して DataFrame を作成し、縁取りします。

val edgeDf = sqlContext.createDataFrame(
  graph.edges.map{edge => Row(edge.srcId, edge.dstId, edge.attr)}, 
  StructType(Array(
    StructField("srcId", LongType, false),
    StructField("dstId", LongType, false),
    StructField("attr", LongType, false)
  ))
)

次に、これを実行して、サブグラフなしでグラフを作成できます。

val solution1 = Graph(
  nodesAB, 
  edgeDf
  .join(newNodes, $"srcId" === $"newNode").select($"srcId", $"dstId", $"attr")
  .join(newNodes, $"dstId" === $"newNode")
  .rdd.map(row => Edge(row.getLong(0), row.getLong(1), row.getLong(2)))
)
于 2015-05-22T21:14:15.623 に答える