10

collection.parallel.mutable.ParHashMap (または他の並列コレクション)の非常に大きなインスタンスが与えられた場合、特定の、たとえば50の一致数が見つかったら、どのようにしてフィルタリング並列スキャンを中止できますか?

スレッドセーフな「外部」データ構造に中間一致を蓄積しようとしたり、結果カウントを含む外部AtomicIntegerを維持したりすることは、通常のcollection.mutable.HashMapを使用して単一コアを100にペギングするよりも、4コアで2〜3倍遅いようです。%。

Par *コレクションの検索または存在は、「内部」で中止されることを認識しています。これを一般化して複数の結果を見つける方法はありますか?

これは、ParHashMapで約79,000エントリの場合でも2〜3倍遅いように見え、maxResultsの結果よりも多くの結果を結果のCHMに詰め込むという問題もあります(これはおそらく、incrementAndGetの後、 breakの前にスレッドがプリエンプトされたためです)他のスレッドが要素を追加できるようにします)。更新:速度の低下は、counter.incrementAndGet()で競合するワーカースレッドが原因であるようです。これはもちろん、並列スキャン全体の目的を無効にします:-(

def find(filter: Node => Boolean, maxResults: Int): Iterable[Node] =
{
  val counter = new AtomicInteger(0)
  val results = new ConcurrentHashMap[Key,  Node](maxResults)

  import util.control.Breaks._

  breakable
  {
    for ((key, node) <- parHashMap if filter(node))
    {
      results.put(key, node)
      val total = counter.incrementAndGet()
      if (total > maxResults) break
    }
  }

  results.values.toArray(new Array[Node](results.size))
}
4

3 に答える 3

2

最初に、変数 maxResults がスレッドローカルになる並列スキャンを実行します。これにより、最大 (maxResults * numberOfThreads) の結果が見つかります。

次に、シングル スレッド スキャンを実行して maxResults に減らします。

于 2011-11-24T11:17:14.147 に答える
1

私はあなたの事件について興味深い調査を行いました。

調査推論

問題は入力 Map の可変性にあるのではないかと考えました。その理由を説明しようと思います。ウィキペディアでわかるように、HashMap 実装はデータをさまざまなバケットに編成します。

ウィキペディアのハッシュマップ

Java の最初のスレッド セーフなコレクションである同期コレクションは、基礎となる実装のすべてのメソッドを同期することに基づいていたため、パフォーマンスが低下しました。ConcurrentHashMap などのよりパフォーマンスの高い Concurrent Collection にもたらされたさらなる研究と思考により、どちらのアプローチがよりスマートでしたか?特定のロックで各バケットを保護しないのはなぜでしょうか?

私の感覚によると、パフォーマンスの問題は次の理由で発生します。

  • フィルターを並行して実行すると、一部のスレッドが同じバケットに一度にアクセスする際に競合し、同じロックにヒットします。これは、マップが可変であるためです。
  • 実際に結果のサイズを確認しながら、カウンターを持って結果の数を確認します。コレクションを構築するスレッドセーフな方法がある場合は、スレッドセーフなカウンターも必要ありません。

調査結果

テスト ケースを作成したところ、自分が間違っていたことがわかりました。問題は、出力マップの同時性にあります。実際、要素をマップに配置するときではなく、要素を反復するときに衝突が発生します。さらに、値の結果のみが必要なため、キー、ハッシュ、およびすべてのマップ機能は必要ありません。AtomicCounterを削除してマップのみを使用しresultて、バージョンがどのように機能するかを十分に収集したかどうかをテストすることは興味深いかもしれません。

Scala 2.9.2 では次のコードに注意してください。別の投稿で、並列バージョンと非並列バージョンに 2 つの異なる関数が必要な理由を説明しています: Calling map on a parallel collection via a reference to an祖先型

object MapPerformance {

  val size = 100000
  val items = Seq.tabulate(size)( x => (x,x*2))


  val concurrentParallelMap = ImmutableParHashMap(items:_*)

  val concurrentMutableParallelMap = MutableParHashMap(items:_*)

  val unparallelMap = Map(items:_*)


  class ThreadSafeIndexedSeqBuilder[T](maxSize:Int) {
    val underlyingBuilder = new VectorBuilder[T]()
    var counter = 0
    def sizeHint(hint:Int) { underlyingBuilder.sizeHint(hint) }
    def +=(item:T):Boolean ={
      synchronized{
        if(counter>=maxSize)
          false
        else{
          underlyingBuilder+=item
          counter+=1
          true
        }
      }
    }
    def result():Vector[T] = underlyingBuilder.result()

  }

  def find(map:ParMap[Int,Int],filter: Int => Boolean, maxResults: Int): Iterable[Int] =
  {

    // we already know the maximum size
    val resultsBuilder = new ThreadSafeIndexedSeqBuilder[Int](maxResults)
    resultsBuilder.sizeHint(maxResults)

    import util.control.Breaks._
    breakable
    {
      for ((key, node) <- map if filter(node))
      {
        val newItemAdded = resultsBuilder+=node
        if (!newItemAdded)
          break()

      }
    }
    resultsBuilder.result().seq

  }

  def findUnParallel(map:Map[Int,Int],filter: Int => Boolean, maxResults: Int): Iterable[Int] =
  {

    // we already know the maximum size
    val resultsBuilder = Array.newBuilder[Int]
    resultsBuilder.sizeHint(maxResults)

    var counter = 0
      for {
        (key, node) <- map if filter(node)
        if counter < maxResults
      }{
        resultsBuilder+=node
        counter+=1
      }

    resultsBuilder.result()

  }

  def measureTime[K](f: => K):(Long,K) = {
    val startMutable = System.currentTimeMillis()
    val result = f
    val endMutable = System.currentTimeMillis()
    (endMutable-startMutable,result)
  }

  def main(args:Array[String]) = {
    val maxResultSetting=10
    (1 to 10).foreach{
      tryNumber =>
        println("Try number " +tryNumber)
        val (mutableTime, mutableResult) = measureTime(find(concurrentMutableParallelMap,_%2==0,maxResultSetting))
        val (immutableTime, immutableResult) = measureTime(find(concurrentMutableParallelMap,_%2==0,maxResultSetting))
        val (unparallelTime, unparallelResult) = measureTime(findUnParallel(unparallelMap,_%2==0,maxResultSetting))
        assert(mutableResult.size==maxResultSetting)
        assert(immutableResult.size==maxResultSetting)
        assert(unparallelResult.size==maxResultSetting)
        println(" The mutable version has taken " + mutableTime + " milliseconds")
        println(" The immutable version has taken " + immutableTime + " milliseconds")
        println(" The unparallel version has taken " + unparallelTime + " milliseconds")
     }
  }

}

このコードを使用すると、体系的に並列 (入力マップの可変バージョンと不変バージョンの両方) が、私のマシンの非並列よりも約 3.5 倍速くなります。

于 2013-01-18T18:05:50.153 に答える
0

イテレータを取得してから、(述語を使用して) フィルタリングして必要な要素の数を取得する遅延リスト (ストリーム) を作成することができます。非正格であるため、この要素の「取得」は評価されません。その後、全体に「.par」を追加して実行を強制し、並列化を実現できます。

コード例:

ランダムな値を持つ並列化されたマップ (並列ハッシュ マップをシミュレート):

scala> myMap
res14: scala.collection.parallel.immutable.ParMap[Int,Int] = ParMap(66978401 -> -1331298976, 256964068 -> 126442706, 1698061835 -> 1622679396, -1556333580 -> -1737927220, 791194343 -> -591951714, -1907806173 -> 365922424, 1970481797 -> 162004380, -475841243 -> -445098544, -33856724 -> -1418863050, 1851826878 -> 64176692, 1797820893 -> 405915272, -1838192182 -> 1152824098, 1028423518 -> -2124589278, -670924872 -> 1056679706, 1530917115 -> 1265988738, -808655189 -> -1742792788, 873935965 -> 733748120, -1026980400 -> -163182914, 576661388 -> 900607992, -1950678599 -> -731236098)

イテレータを取得し、イテレータから Stream を作成してフィルタリングします。この場合、述語は (マップの値メンバーの) ペアのみを受け入れます。私は 10 個の偶数要素を取得したいので、強制した場合にのみ評価される 10 個の要素を取得します。

scala> val mapIterator = myMap.toIterator
mapIterator: Iterator[(Int, Int)] = HashTrieIterator(20)


scala> val r = Stream.continually(mapIterator.next()).filter(_._2 % 2 == 0).take(10)
r: scala.collection.immutable.Stream[(Int, Int)] = Stream((66978401,-1331298976), ?)

最後に、予定通り 10 要素しか得られない評価を強制します。

scala> r.force
res16: scala.collection.immutable.Stream[(Int, Int)] = Stream((66978401,-1331298976), (256964068,126442706), (1698061835,1622679396), (-1556333580,-1737927220), (791194343,-591951714), (-1907806173,365922424), (1970481797,162004380), (-475841243,-445098544), (-33856724,-1418863050), (1851826878,64176692))

このようにして、必要な数の要素のみを取得し (残りの要素を処理する必要はありません)、ロック、アトミック、またはブレークなしでプロセスを並列化します。

これをあなたのソリューションと比較して、それが良いかどうかを確認してください。

于 2011-11-10T00:45:55.747 に答える