GraphFrames を使い始めたばかりで、ドキュメントに従っていますが、aggregateMessages 関数から結果を取得できません (空のデータフレームが返されます)。これが私の問題の単純化された例です: 私の頂点RDDが頂点属性のないtestGraph
単一の頂点のみで構成され、Y
私のedgeRDDが次のような2つのレコードで構成されるように呼び出されたGraphFramesオブジェクト:
| src | dst | min_ts1 | min_ts2 |
| X | Y | 20 | null |
| Y | X | null | -10 |
min_ts1
ここで、 の値を にdst
送信min_ts2
し、 に送信する単純なアルゴリズムを実装したいと考えていますsrc
。このアルゴリズムを実装するために使用しているコードは次のとおりです。
import org.graphframes.lib.AggregateMessages
import org.apache.spark.sql.functions._
val AM = AggregateMessages
val msgToSrc = AM.edge("min_ts2)
val msgToDst = AM.edge("min_ts1")
val delay = testGraph
.aggregateMessages
.sendToSrc(msgToSrc)
.sendToDst(msgToDst)
.agg(sum(AM.msg).as("avg_time_delay"))
ここにいくつかの null 値があることに気付きましたが、いずれにせよ、メッセージ パッシング アルゴリズムが次のことを行うことを期待します: 最初のレコードを見て、 to のメッセージと to のメッセージを送信20
します。次に、2 番目のレコードを見て、X へのメッセージと へのメッセージを送信します。最後に、結果が のメッセージの合計が であることを示し、頂点RDD に含まれていないため、結果に記録がないことを期待します。また、頂点RDDに含まれている場合、両方のメッセージが.Y
null
X
null
-10
Y
Y
10
X
X
null
null
ただし、取得しているのは空のRDDです。空の結果が得られる理由を誰かが理解するのを手伝ってくれませんか?