私は最近、この同じ問題に苦しんでいましたが、(key: Int, (domain: String, count: Long))
. データセットはより単純ですが、ドキュメントに記載されているように、groupByKey を使用することによるスケーリング/パフォーマンスの問題がまだあります。
(K, V) ペアのデータセットで呼び出されると、(K, Iterable) ペアのデータセットを返します。注: 各キーに対して集計 (合計や平均など) を実行するためにグループ化する場合は、reduceByKey または CombineByKey を使用すると、パフォーマンスが大幅に向上します。
私の場合、入力が 100 万を超える非常に大きいため、すぐに問題に遭遇しましたIterable
。(K, Iterable<V>)
そのため、上位 N の並べ替えと取得に非常にコストがかかり、メモリの問題が発生する可能性があります。
掘り下げた後、以下の参照を参照してください。これは、combineByKey を使用して同じタスクを実行およびスケーリングする方法で達成する完全な例です。
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object TopNForKey {
var SampleDataset = List(
(1, ("apple.com", 3L)),
(1, ("google.com", 4L)),
(1, ("stackoverflow.com", 10L)),
(1, ("reddit.com", 15L)),
(2, ("slashdot.org", 11L)),
(2, ("samsung.com", 1L)),
(2, ("apple.com", 9L)),
(3, ("microsoft.com", 5L)),
(3, ("yahoo.com", 3L)),
(3, ("google.com", 4L)))
//sort and trim a traversable (String, Long) tuple by _2 value of the tuple
def topNs(xs: TraversableOnce[(String, Long)], n: Int) = {
var ss = List[(String, Long)]()
var min = Long.MaxValue
var len = 0
xs foreach { e =>
if (len < n || e._2 > min) {
ss = (e :: ss).sortBy((f) => f._2)
min = ss.head._2
len += 1
}
if (len > n) {
ss = ss.tail
min = ss.head._2
len -= 1
}
}
ss
}
def main(args: Array[String]): Unit = {
val topN = 2
val sc = new SparkContext("local", "TopN For Key")
val rdd = sc.parallelize(SampleDataset).map((t) => (t._1, t._2))
//use combineByKey to allow spark to partition the sorting and "trimming" across the cluster
val topNForKey = rdd.combineByKey(
//seed a list for each key to hold your top N's with your first record
(v) => List[(String, Long)](v),
//add the incoming value to the accumulating top N list for the key
(acc: List[(String, Long)], v) => topNs(acc ++ List((v._1, v._2)), topN).toList,
//merge top N lists returned from each partition into a new combined top N list
(acc: List[(String, Long)], acc2: List[(String, Long)]) => topNs(acc ++ acc2, topN).toList)
//print results sorting for pretty
topNForKey.sortByKey(true).foreach((t) => {
println(s"key: ${t._1}")
t._2.foreach((v) => {
println(s"----- $v")
})
})
}
}
そして、私が戻ってきたrddで得たもの...
(1, List(("google.com", 4L),
("stackoverflow.com", 10L))
(2, List(("apple.com", 9L),
("slashdot.org", 15L))
(3, List(("google.com", 4L),
("microsoft.com", 5L))
参考文献
https://www.mail-archive.com/user@spark.apache.org/msg16827.html
https://stackoverflow.com/a/8275562/807318
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions