0

以下のように定義された HashMap のこの配列があります

var distinctElementsDefinitionMap: scala.collection.mutable.ArrayBuffer[HashMap[String, Int]] = new scala.collection.mutable.ArrayBuffer[HashMap[String, Int]](300) with scala.collection.mutable.SynchronizedBuffer[HashMap[String, Int]]

今、私は300要素の並列コレクションを持っています

val max_length = 300
val columnArray = (0 until max_length).toParArray
import scala.collection.parallel.ForkJoinTaskSupport
columnArray.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(100))
columnArray foreach(i => {
    // Do Some Computation and get a HashMap
    var distinctElementsMap: HashMap[String, Int] = //Some Value
    //This line might result in Concurrent Access Exception
    distinctElementsDefinitionMap.update(i, distinctElementsMap)
})

上記で定義されたforeachループ内で計算集約型のタスクを実行しています。columnArray計算が完了したら、各スレッドがdistinctElementsDefinitionMap配列の特定のエントリを更新するようにします。各スレッドは、それを実行するスレッドに固有の特定のインデックス値のみを更新します。複数のスレッドが同時に書き込みを行う可能性があるため、配列のエントリのこの更新が安全かどうかを知りたいですか? そうでない場合、synchronizedそれを行う方法があるので、スレッドセーフですか? ありがとう!

更新: これは実際には安全な方法ではないようです。java.util.ConcurrentModificationException 並列コレクションを使用しているときにこれを回避する方法についてのヒントを取得しています。

4

1 に答える 1

0

並列化されていると.groupBy判断できる限り、操作を使用します( などの他の方法とは異なります).sorted

case class Row(a: String, b: String, c: String)
val data = Vector(
  Row("foo", "", ""), 
  Row("bar", "", ""), 
  Row("foo", "", "")
)

data.par.groupBy(x => x.a).seq
// Map(bar -> ParVector(Row(bar,,)), foo -> ParVector(Row(foo,,), Row(foo,,)))

あなたがアイデアを得たことを願っています。

または、RAM で行ではなく各列で処理を並列化できる場合は、現在のアプローチよりもはるかに効率的でなければなりません (競合が少ない)。

val columnsCount = 3 // 300 in your case
Vector.range(0, columnsCount).par.map { column => 
  data.groupBy(row => row(column))
}.seq 

ただし、単一の列でもメモリの問題が発生する可能性があります (8M 行はかなり多い可能性があります)。

于 2014-07-11T21:50:04.120 に答える