6

NgroupByKey の後にトップ項目を取得し、(以下の)の型を値がどこにあるかRDDに変換したいと思いますtopNPerGroupRDD[(String, Int)]List[Int]flatten

dataは_

val data = sc.parallelize(Seq("foo"->3, "foo"->1, "foo"->2,
                              "bar"->6, "bar"->5, "bar"->4))

Nグループごとの上位アイテムは次のように計算されます。

val topNPerGroup: RDD[(String, List[Int]) = data.groupByKey.map { 
   case (key, numbers) => 
       key -> numbers.toList.sortBy(-_).take(2)
}

結果は

(bar,List(6, 5))
(foo,List(3, 2))

によって印刷された

topNPerGroup.collect.foreach(println)

私が達成した場合、topNPerGroup.collect.foreach(println)生成されます (期待される結果! )

(bar, 6)
(bar, 5)
(foo, 3)
(foo, 2)
4

4 に答える 4

7

Spark 1.4.0 はこの問題を解決します。

https://github.com/apache/spark/commit/5e6ad24ff645a9b0f63d9c0f17193550963aa0a7をご覧ください

これBoundedPriorityQueueaggregateByKey

def topByKey(num: Int)(implicit ord: Ordering[V]): RDD[(K, Array[V])] = {
  self.aggregateByKey(new BoundedPriorityQueue[V](num)(ord))(
    seqOp = (queue, item) => {
      queue += item
    },
    combOp = (queue1, queue2) => {
      queue1 ++= queue2
    }
  ).mapValues(_.toArray.sorted(ord.reverse))  // This is an min-heap, so we reverse the order.
}
于 2015-06-26T12:37:53.077 に答える
6

私は最近、この同じ問題に苦しんでいましたが、(key: Int, (domain: String, count: Long)). データセットはより単純ですが、ドキュメントに記載されているように、groupByKey を使用することによるスケーリング/パフォーマンスの問題がまだあります。

(K, V) ペアのデータセットで呼び出されると、(K, Iterable) ペアのデータセットを返します。注: 各キーに対して集計 (合計や平均など) を実行するためにグループ化する場合は、reduceByKey または CombineByKey を使用すると、パフォーマンスが大幅に向上します。

私の場合、入力が 100 万を超える非常に大きいため、すぐに問題に遭遇しましたIterable(K, Iterable<V>)そのため、上位 N の並べ替えと取得に非常にコストがかかり、メモリの問題が発生する可能性があります。

掘り下げた後、以下の参照を参照してください。これは、combineByKey を使用して同じタスクを実行およびスケーリングする方法で達成する完全な例です。

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object TopNForKey {

  var SampleDataset = List(
    (1, ("apple.com", 3L)),
    (1, ("google.com", 4L)),
    (1, ("stackoverflow.com", 10L)),
    (1, ("reddit.com", 15L)),
    (2, ("slashdot.org", 11L)),
    (2, ("samsung.com", 1L)),
    (2, ("apple.com", 9L)),
    (3, ("microsoft.com", 5L)),
    (3, ("yahoo.com", 3L)),
    (3, ("google.com", 4L)))

  //sort and trim a traversable (String, Long) tuple by _2 value of the tuple
  def topNs(xs: TraversableOnce[(String, Long)], n: Int) = {
    var ss = List[(String, Long)]()
    var min = Long.MaxValue
    var len = 0
    xs foreach { e =>
      if (len < n || e._2 > min) {
        ss = (e :: ss).sortBy((f) => f._2)
        min = ss.head._2
        len += 1
      }
      if (len > n) {
        ss = ss.tail
        min = ss.head._2
        len -= 1
      }
    }
    ss
  }

  def main(args: Array[String]): Unit = {

    val topN = 2
    val sc = new SparkContext("local", "TopN For Key")
    val rdd = sc.parallelize(SampleDataset).map((t) => (t._1, t._2))

    //use combineByKey to allow spark to partition the sorting and "trimming" across the cluster
    val topNForKey = rdd.combineByKey(
      //seed a list for each key to hold your top N's with your first record
      (v) => List[(String, Long)](v),
      //add the incoming value to the accumulating top N list for the key
      (acc: List[(String, Long)], v) => topNs(acc ++ List((v._1, v._2)), topN).toList,
      //merge top N lists returned from each partition into a new combined top N list
      (acc: List[(String, Long)], acc2: List[(String, Long)]) => topNs(acc ++ acc2, topN).toList)

    //print results sorting for pretty
    topNForKey.sortByKey(true).foreach((t) => {
      println(s"key: ${t._1}")
      t._2.foreach((v) => {
        println(s"----- $v")
      })

    })

  }

}

そして、私が戻ってきたrddで得たもの...

(1, List(("google.com", 4L),
         ("stackoverflow.com", 10L))
(2, List(("apple.com", 9L),
         ("slashdot.org", 15L))
(3, List(("google.com", 4L),
         ("microsoft.com", 5L))

参考文献

https://www.mail-archive.com/user@spark.apache.org/msg16827.html

https://stackoverflow.com/a/8275562/807318

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions

于 2014-12-05T20:23:52.730 に答える
4

あなたの質問は少し混乱していますが、これはあなたが探しているものだと思います:

val flattenedTopNPerGroup = 
    topNPerGroup.flatMap({case (key, numbers) => numbers.map(key -> _)})

そしてreplでは、あなたが望むものを出力します:

flattenedTopNPerGroup.collect.foreach(println)
(foo,3)
(foo,2)
(bar,6)
(bar,5)
于 2014-12-03T15:43:35.140 に答える