1

私はチュートリアル http://ampcamp.berkeley.edu/big-data-mini-course/graph-analytics-with-graphx.htmlで実行しています

そして、ある時点で mapReduceTriplets 操作を使用します。これは期待される結果を返します

// Find the oldest follower for each user
val oldestFollower: VertexRDD[(String, Int)] = userGraph.mapReduceTriplets[(String, Int)](
  // For each edge send a message to the destination vertex with the attribute of the source vertex
  edge => Iterator((edge.dstId, (edge.srcAttr.name, edge.srcAttr.age))),
  // To combine messages take the message for the older follower
  (a, b) => if (a._2 > b._2) a else b
)

しかし、IntelliJ は、mapReduceTriplets は非推奨 (1.2.0 の時点) であり、aggregateMessages に置き換える必要があることを指摘しています。

// Find the oldest follower for each user
val oldestFollower: VertexRDD[(String, Int)] = userGraph.aggregateMessages()[(String, Int)](
  // For each edge send a message to the destination vertex with the attribute of the source vertex
  edge => Iterator((edge.dstId, (edge.srcAttr.name, edge.srcAttr.age))),
  // To combine messages take the message for the older follower
  (a, b) => if (a._2 > b._2) a else b
)

したがって、まったく同じコードを実行しますが、出力はありません。それは予想される結果ですか、それともaggregateMessagesの変化のために何かを変更する必要がありますか?

4

1 に答える 1

5

おそらく、次のようなものが必要です。

val oldestFollower: VertexRDD[(String, Int)] = userGraph.aggregateMessages[(String, Int)]
(
    // For each edge send a message to the destination vertex with the attribute of the source vertex
    sendMsg = { triplet => triplet.sendToDst(triplet.srcAttr.name, triplet.srcAttr.age) },
   // To combine messages take the message for the older follower
    mergeMsg = {(a, b) => if (a._2 > b._2) a else b}
)

aggregateMessages関数のシグネチャと便利な例は、Grapx プログラミング ガイドページで見つけることができます。お役に立てれば。

于 2015-01-31T23:08:28.867 に答える