私は Apache Spark の初心者です。重みの合計が RDD の定数値より大きいすべてのグループを除外したいと考えています。「重み」マップも RDD です。これは小さなサイズのデモです。フィルタリングするグループは「groups」に格納され、定数値は 12 です。
val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
val wm = weights.toArray.toMap
def isheavy(inp: String): Boolean = {
val allw = inp.split(",").map(wm(_)).sum
allw > 12
}
val result = groups.filter(isheavy)
入力データが非常に大きい場合 (たとえば、10 GB を超える場合)、常に「Java ヒープがメモリ不足です」というエラーが発生します。分散RDDをJVMのJavaオブジェクトに変換するため、「weights.toArray.toMap」が原因かどうか疑問に思いました。だから私はRDDで直接フィルタリングしようとしました:
val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
def isheavy(inp: String): Boolean = {
val items = inp.split(",")
val wm = items.map(x => weights.filter(_._1 == x).first._2)
wm.sum > 12
}
val result = groups.filter(isheavy)
このスクリプトを Spark シェルにロードした後に実行するとresult.collect
、「java.lang.NullPointerException」エラーが発生しました。RDD が別の RDD で操作されると、nullpointer 例外が発生するので、Redis に重みを入れることを提案する人がいます。
では、「重み」をマップに変換したり、Redis に入れたりせずに「結果」を取得するにはどうすればよいでしょうか? 外部データストア サービスの助けを借りずに、別のマップのような RDD に基づいて RDD をフィルター処理するソリューションがある場合は? ありがとう!