2

私は GraphFrames と Scala にかなり慣れていません。私はある種のラベル伝播アルゴリズムを書いています(ライブラリのものとは非常に異なります)。基本的に、各頂点には配列「memVector」があり、エッジには float 値「floatWeights」があります。各頂点の memVector を更新して、すべての隣接頂点からの (floatWeights * memVector) の合計にします。これは私が同じために書いたコードです:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext
import org.graphframes._
import org.graphframes.lib.AggregateMessages
import org.apache.spark.sql.functions.udf

val sqlContext = new SQLContext(sc)

val edges = spark.read.parquet("code/SampleGraphEdge")
val vertices = spark.read.parquet("code/SampleGraphVer")

val toInteger: String => Int = _.toInt
val toIntegerUDF = udf(toInteger)
val newEdges = edges.withColumn("floatWeights", toIntegerUDF('weights)).drop("weights")

val graph = GraphFrame(vertices, newEdges)
val AM = AggregateMessages

val msgToSrc = AM.dst("memVector")
val msgToDst = AM.src("memVector")
val msgFromEdge = AM.edge("floatWeights")

def aggfunc(msg: org.apache.spark.sql.Column) = sum(msg.getField("weights") * AM.msg.getField("memVector"))

val agg = graph.aggregateMessages.sendToSrc(msgToSrc).sendToDst(msgToDst).sendToSrc(sendFromEdge).sendToDst(sendFromEdge).agg(aggfunc(AM.msg).as("UpdatedVector"))

配列と浮動小数点数を直接掛けることができないため、私が書いた aggfunc は正しくありません。上記をspark-shellで実行していますが、最後の行で次のエラーが発生しています:

org.apache.spark.sql.AnalysisException: Can't extract value from MSG#750;
at org.apache.spark.sql.catalyst.expressions.ExtractValue$.apply(complexTypeExtractors.scala:73)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:613)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:605)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:312)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:312)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:311)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:269)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:279)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:283)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:283)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$8.apply(QueryPlan.scala:288)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:288)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:605)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:547)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:547)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:484)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:65)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:63)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:51)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
at org.apache.spark.sql.RelationalGroupedDataset.toDF(RelationalGroupedDataset.scala:62)
at org.apache.spark.sql.RelationalGroupedDataset.agg(RelationalGroupedDataset.scala:222)
at org.graphframes.lib.AggregateMessages.agg(AggregateMessages.scala:127)
... 50 elided

私はそれに近づいていますか?回避策/解決策は大歓迎です。

4

0 に答える 0