3

私は Apache Spark の初心者です。重みの合計が RDD の定数値より大きいすべてのグループを除外したいと考えています。「重み」マップも RDD です。これは小さなサイズのデモです。フィルタリングするグループは「groups」に格納され、定数値は 12 です。

val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
val wm = weights.toArray.toMap
def isheavy(inp: String): Boolean = {
  val allw = inp.split(",").map(wm(_)).sum
  allw > 12
}
val result = groups.filter(isheavy)

入力データが非常に大きい場合 (たとえば、10 GB を超える場合)、常に「Java ヒープがメモリ不足です」というエラーが発生します。分散RDDをJVMのJavaオブジェクトに変換するため、「weights.toArray.toMap」が原因かどうか疑問に思いました。だから私はRDDで直接フィルタリングしようとしました:

val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
def isheavy(inp: String): Boolean = {
  val items = inp.split(",")
  val wm = items.map(x => weights.filter(_._1 == x).first._2)
  wm.sum > 12
}
val result = groups.filter(isheavy)

このスクリプトを Spark シェルにロードした後に実行するとresult.collect、「java.lang.NullPointerException」エラーが発生しました。RDD が別の RDD で操作されると、nullpointer 例外が発生するので、Redis に重みを入れることを提案する人がいます。

では、「重み」をマップに変換したり、Redis に入れたりせずに「結果」を取得するにはどうすればよいでしょうか? 外部データストア サービスの助けを借りずに、別のマップのような RDD に基づいて RDD をフィルター処理するソリューションがある場合は? ありがとう!

4

2 に答える 2

4

グループが一意であるとします。それ以外の場合は、まず、distinct などで一意にします。グループまたは重みが小さい場合は、簡単なはずです。グループと重みの両方が大きい場合は、これを試すことができます。これはよりスケーラブルかもしれませんが、複雑に見えます。

val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
//map groups to be (a, (a,b,c,d)), (b, (a,b,c,d), (c, (a,b,c,d)....
val g1=groups.flatMap(s=>s.split(",").map(x=>(x, Seq(s))))
//j will be (a, ((a,b,c,d),3)...
val j = g1.join(weights)
//k will be ((a,b,c,d), 3), ((a,b,c,d),2) ...
val k = j.map(x=>(x._2._1, x._2._2))
//l will be ((a,b,c,d), (3,2,5,1))...
val l = k.groupByKey()
//filter by sum the 2nd
val m = l.filter(x=>{var sum = 0; x._2.foreach(a=> {sum=sum+a});sum > 12})
//we only need the original list
val result=m.map(x=>x._1)
//don't do this in real product, otherwise, all results go to driver.instead using saveAsTextFile, etc
scala> result.foreach(println)
List(e,g)
List(b,c,e)
于 2014-09-26T03:20:08.803 に答える