3

私の質問は、 Scala の List[List[T]] 内の各要素の回数のカウントと非常に似ていますが、並列コレクションを含む効率的なソリューションが必要な点が異なります。

vec具体的には、Int の短い (~10) リストのx大きな (~10^7) ベクトルを持っていxますMap[Int,Int]。異なる整数の数は 10^6 のオーダーです。

これを実行する必要があるマシンにはかなりの量のメモリ (150GB) とコア数 (>100) があるため、これには並列コレクションが適しているようです。以下のコードは良いアプローチですか?

val flatpvec = vec.par.flatten
val flatvec = flatpvec.seq
val unique = flatpvec.distinct
val counts = unique map (x => (x -> flatvec.count(_ == x)))
counts.toMap

または、より良い解決策はありますか?.seq 変換について疑問がある場合: 何らかの理由で、次のコードは小さな例であっても終了しないようです:

val flatpvec = vec.par.flatten
val unique = flatpvec.distinct
val counts = unique map (x => (x -> flatpvec.count(_ == x)))
counts.toMap
4

2 に答える 2

3

これは何かをします。 順次折り畳みの結果も組み合わせる点を除いてaggregateは似ています。fold

更新: にオーバーヘッドがあることは驚くべきことではありませんが.par.groupBy、一定の要因に驚きました。これらの数字では、そのように数えることは決してありません。また、メモリを増やす必要がありました。

結果マップを構築するために使用される興味深い手法については、概要からリンクされているこのペーパーで説明されています。(中間結果を巧みに保存し、最後にそれらを並列に結合します。)

groupByしかし、本当にカウントだけが必要な場合は、ターンの中間結果をコピーするとコストがかかることがわかります。

数値は、順次groupBy、並列、および最後に比較していaggregateます。

apm@mara:~/tmp$ scalacm countints.scala ; scalam -J-Xms8g -J-Xmx8g -J-Xss1m countints.Test
GroupBy: Starting...
Finished in 12695
GroupBy: List((233,10078), (237,20041), (268,9939), (279,9958), (315,10141), (387,9917), (462,9937), (680,9932), (848,10139), (858,10000))
Par GroupBy: Starting...
Finished in 51481
Par GroupBy: List((233,10078), (237,20041), (268,9939), (279,9958), (315,10141), (387,9917), (462,9937), (680,9932), (848,10139), (858,10000))
Aggregate: Starting...
Finished in 2672
Aggregate: List((233,10078), (237,20041), (268,9939), (279,9958), (315,10141), (387,9917), (462,9937), (680,9932), (848,10139), (858,10000))

テスト コードに魔法のようなものは何もありません。

import collection.GenTraversableOnce
import collection.concurrent.TrieMap
import collection.mutable

import concurrent.duration._

trait Timed {
  def now = System.nanoTime
  def timed[A](op: =>A): A =  {
    val start = now
    val res = op
    val end = now
    val lapsed = (end - start).nanos.toMillis
    Console println s"Finished in $lapsed"
    res
  }
  def showtime(title: String, op: =>GenTraversableOnce[(Int,Int)]): Unit = {
    Console println s"$title: Starting..."
    val res = timed(op)
    //val showable = res.toIterator.min   //(res.toIterator take 10).toList
    val showable = res.toList.sorted take 10
    Console println s"$title: $showable"
  }
}

興味のあるランダムなデータを生成します。

object Test extends App with Timed {

  val upto = math.pow(10,6).toInt
  val ran = new java.util.Random
  val ten = (1 to 10).toList
  val maxSamples = 1000
  // samples of ten random numbers in the desired range
  val samples = (1 to maxSamples).toList map (_ => ten map (_ => ran nextInt upto))
  // pick a sample at random
  def anyten = samples(ran nextInt maxSamples)
  def mag = 7
  val data: Vector[List[Int]] = Vector.fill(math.pow(10,mag).toInt)(anyten)

の逐次操作と結合操作がaggregateタスクから呼び出され、その結果が volatile var に割り当てられます。

  def z: mutable.Map[Int,Int] = mutable.Map.empty[Int,Int]
  def so(m: mutable.Map[Int,Int], is: List[Int]) = {
    for (i <- is) {
      val v = m.getOrElse(i, 0)
      m(i) = v + 1
    }
    m
  }
  def co(m: mutable.Map[Int,Int], n: mutable.Map[Int,Int]) = {
    for ((i, count) <- n) {
      val v = m.getOrElse(i, 0)
      m(i) = v + count
    }
    m
  }
  showtime("GroupBy", data.flatten groupBy identity map { case (k, vs) => (k, vs.size) })
  showtime("Par GroupBy", data.flatten.par groupBy identity map { case (k, vs) => (k, vs.size) })
  showtime("Aggregate", data.par.aggregate(z)(so, co))
}
于 2013-08-20T17:40:10.607 に答える