10

Scala で変更可能なマップを使用して、文字列を見た回数を追跡したいとします。シングルスレッドのコンテキストでは、これは簡単です:

import scala.collection.mutable.{ Map => MMap }

class Counter {
  val counts = MMap.empty[String, Int].withDefaultValue(0)

  def add(s: String): Unit = counts(s) += 1
}

残念ながら、と はアトミックに発生しないため、これはスレッドセーフでgetはありません。update

並行マップは、可変マップ API にいくつかのアトミック操作を追加しますが、私が必要とするものではありません。これは次のようになります。

def replace(k: A, f: B => B): Option[B]

私はScalaSTMを使用できることを知っていますTMap:

import scala.concurrent.stm._

class Counter {
  val counts =  TMap.empty[String, Int]

  def add(s: String): Unit = atomic { implicit txn =>
    counts(s) = counts.get(s).getOrElse(0) + 1
  }
}

しかし (今のところ) それはまだ追加の依存関係です。他のオプションには、アクター (別の依存関係)、同期 (潜在的に効率が悪い)、または Java のアトミック参照(あまり慣用的ではない) が含まれます。

一般的に、私は Scala での変更可能なマップを避けますが、時々この種のものが必要になり、最近では STM アプローチを使用しました (指を交差させて、ナイーブに噛まれないようにする代わりに)解決)。

ここには多くのトレードオフがあることを知っています (余分な依存関係、パフォーマンス、明確さなど)。しかし、Scala 2.10 でこの問題に対する「正しい」答えのようなものはありますか?

4

4 に答える 4

3

最も簡単な解決策は間違いなく同期です。あまり競合がなければ、パフォーマンスはそれほど悪くないかもしれません。

replaceそれ以外の場合は、独自の STM のような実装をロールアップしようとすることができます。このようなことができるかもしれません:

object ConcurrentMapOps {
  private val rng = new util.Random
  private val MaxReplaceRetryCount = 10
  private val MinReplaceBackoffTime: Long = 1
  private val MaxReplaceBackoffTime: Long = 20
}
implicit class ConcurrentMapOps[A, B]( val m: collection.concurrent.Map[A,B] ) {
  import ConcurrentMapOps._
  private def replaceBackoff() {
    Thread.sleep( (MinReplaceBackoffTime + rng.nextFloat * (MaxReplaceBackoffTime - MinReplaceBackoffTime) ).toLong ) // A bit crude, I know
  }

  def replace(k: A, f: B => B): Option[B] = {
    m.get( k ) match {
      case None => return None
      case Some( old ) =>
        var retryCount = 0
        while ( retryCount <= MaxReplaceRetryCount ) {
          val done = m.replace( k, old, f( old ) )
          if ( done ) {
            return Some( old )
          }
          else {         
            retryCount += 1
            replaceBackoff()
          }
        }
        sys.error("Could not concurrently modify map")
    }
  }
}

衝突の問題は特定のキーに限られていることに注意してください。2 つのスレッドが同じマップにアクセスしているが、異なるキーで動作している場合、衝突は発生せず、置換操作は常に最初に成功します。衝突が検出された場合は、少し待ってから (スレッドが同じキーを求めて永久に競合する可能性を最小限に抑えるためにランダムな時間)、再試行します。

これが本番環境に対応していることを保証することはできませんが (私はちょうど今それを投げました)、うまくいくかもしれません。

UPDATE : もちろん (Ionuş G. Stan が指摘したように)、必要なのが値のインクリメント/デクリメントだけである場合、JavaConcurrentHashMapは既にこれらの操作をロックフリーの方法で提供しています。replace上記の解決策は、変換関数をパラメーターとして受け取るより一般的な方法が必要な場合に適用されます。

于 2013-08-09T15:46:34.610 に答える
2

マップが val としてそこに座っているだけの場合、問題が発生しています。ユースケースを満たしている場合は、次のようなものをお勧めします

class Counter {
  private[this] myCounts = MMap.empty[String, Int].withDefaultValue(0)
  def counts(s: String) = myCounts.synchronized { myCounts(s) }
  def add(s: String) = myCounts.synchronized { myCounts(s) += 1 }
  def getCounts = myCounts.synchronized { Map[String,Int]() ++ myCounts }
}

低競合の使用のために。競合が多い場合は、そのような使用をサポートするように設計された同時実行マップ (例: java.util.concurrent.ConcurrentHashMap) を使用し、値を でラップする必要がありますAtomicWhatever

于 2013-08-09T17:15:27.760 に答える
2

future ベースのインターフェースを使用しても問題ない場合:

trait SingleThreadedExecutionContext {
  val ec = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())
}

class Counter extends SingleThreadedExecutionContext {
  private val counts = MMap.empty[String, Int].withDefaultValue(0)

  def get(s: String): Future[Int] = future(counts(s))(ec)

  def add(s: String): Future[Unit] = future(counts(s) += 1)(ec)
}

テストは次のようになります。

class MutableMapSpec extends Specification {

  "thread safe" in {

    import ExecutionContext.Implicits.global

    val c = new Counter
    val testData = Seq.fill(16)("1")
    await(Future.traverse(testData)(c.add))
    await(c.get("1")) mustEqual 16
  }
}
于 2013-08-09T18:21:59.600 に答える