3

Scala アクターを使用して階乗を計算する方法は?

たとえば、

def factorial(n: Int): BigInt = (BigInt(1) to BigInt(n)).par.product

どうもありがとう。

4

1 に答える 1

4

問題

入力を部分積に分割する必要があります。この部分積は、並列で計算できます。次に、部分積が乗算されて最終積が得られます。

これは、より広いクラスの問題に還元できます。いわゆる並列プレフィックス計算です。それについてはウィキペディアで読むことができます。

短いバージョン:a*b*c*d連想演算_ * _で計算する場合、計算を構造化するか、a*(b*(c*d))または(a*b)*(c*d). 2 番目の方法では、並列に計算a*bc*d、これらの部分的な結果から最終的な結果を計算できます。もちろん、入力値の数が多い場合は、これを再帰的に行うことができます。

解決

免責事項

これは宿題のように聞こえます。そこで、次の 2 つのプロパティを持つソリューションを提供します。

  1. 小さなバグが含まれています
  2. 問題を直接解決せずに、一般的に並列接頭辞を解決する方法を示します

このように、ソリューションをどのように構成する必要があるかはわかりますが、それを使用して彼女の宿題をごまかすことはできません。

ソリューションの詳細

まず、いくつかのインポートが必要です

import akka.event.Logging import java.util.concurrent.TimeUnit import scala.concurrent.duration.FiniteDuration import akka.actor._

次に、アクター間の通信用のヘルパー クラスをいくつか作成します。

case class Calculate[T](values : Seq[T], segment : Int, parallelLimit : Int, fn : (T,T) => T)

trait CalculateResponse
case class CalculationResult[T](result : T, index : Int) extends CalculateResponse
case object Busy extends CalculateResponse

アクターは受信者に忙しいことを伝える代わりに、スタッシュを使用するか、部分的な結果用に独自のキューを実装することもできます。ただし、この場合、送信者が並列計算をどの程度許可するかを決定する必要があると思います。

次に、アクターを作成します。

class ParallelPrefixActor[T] extends Actor {
  val log = Logging(context.system, this)
  val subCalculation = Props(classOf[ParallelPrefixActor[BigInt]])
  val fanOut = 2
  def receive = waitForCalculation

  def waitForCalculation : Actor.Receive = {
    case c : Calculate[T] =>
      log.debug(s"Start calculation for ${c.values.length} values, segment nr. ${c.index}, from ${c.values.head} to ${c.values.last}")
      if (c.values.length < c.parallelLimit) {
        log.debug("Calculating result direct")
        val result = c.values.reduceLeft(c.fn)
        sender ! CalculationResult(result, c.index)
      }else{
        val groupSize: Int = Math.max(1, (c.values.length / fanOut) + Math.min(c.values.length % fanOut, 1))
        log.debug(s"Splitting calculation for ${c.values.length} values up to ${fanOut} children, ${groupSize} elements each, limit ${c.parallelLimit}")
        def segments=c.values.grouped(groupSize)
        log.debug("Starting children")
        segments.zipWithIndex.foreach{case (values, index) =>
          context.actorOf(subCalculation) ! c.copy(values = values, index = index)
        }
        val partialResults: Vector[T] = segments.map(_.head).to[Vector]
        log.debug(s"Waiting for ${partialResults.length} results (${partialResults.indices})")
        context.become(waitForResults(segments.length, partialResults, c, sender), discardOld = true)
      }
  }
  def waitForResults(outstandingResults : Int, partialResults : Vector[T], originalRequest : Calculate[T], originalSender : ActorRef) : Actor.Receive = {
    case c : Calculate[_] => sender ! Busy
    case r : CalculationResult[T] =>
      log.debug(s"Putting result ${r.result} on position ${r.index} in ${partialResults.length}")
      val updatedResults = partialResults.updated(r.index, r.result)
      log.debug("Killing sub-worker")
      sender ! PoisonPill
      if (outstandingResults==1) {
        log.debug("Calculating result from partial results")
        val result = updatedResults.reduceLeft(originalRequest.fn)
        originalSender ! CalculationResult(result, originalRequest.index)
        context.become(waitForCalculation, discardOld = true)
      }else{
        log.debug(s"Still waiting for ${outstandingResults-1} results")
        // For fanOut > 2 one could here already combine consecutive partial results
        context.become(waitForResults(outstandingResults-1, updatedResults, originalRequest, originalSender), discardOld = true)
      }
  }
}

最適化

並列プレフィックス計算の使用は最適ではありません。大きな数の積を計算するアクターは、小さな数の積を計算するアクターよりもはるかに多くの作業を行います (たとえば、 を計算する場合、 を計算するよりも 1 * ... * 100 高速に計算できます)。そのため、数字をシャッフルすることをお勧めします。これにより、大きな数字と小さな数字が混在します。この場合、可換演算を使用するため、これが機能します。一般に、並列プレフィックス計算では、連想操作のみが機能する必要があります。1 * ... * 1090 * ... * 100

パフォーマンス

理論的には

少量のデータの場合、アクター ソリューションのパフォーマンスは、(並列コレクションを使用する) "単純な" ソリューションよりも劣ります。アクター ソリューションは、複雑な計算を行ったり、特殊なハードウェア (グラフィック カードや FPGA など) または複数のマシンに計算を分散したりする場合に役立ちます。アクターを使用すると、誰がどの計算を行うかを制御でき、「ハンギング計算」を再開することもできます。これにより、速度が大幅に向上します。

1 台のマシンで、不均一なメモリ アーキテクチャを使用している場合は、アクター ソリューションが役立つ場合があります。次に、メモリを特定のプロセッサに固定する方法でアクターを編成できます。

いくつかの測定

IntelliJ IDEA の Scala ワークシートを使用して実際のパフォーマンス測定を行いました。

まず、アクター システムをセットアップします。

// Setup the actor system
val system = ActorSystem("root")
// Start one calculation actor
val calculationStart = Props(classOf[ParallelPrefixActor[BigInt]])


val calcolon = system.actorOf(calculationStart, "Calcolon-BigInt")

val inbox = Inbox.create(system)

次に、時間を測定するヘルパー メソッドを定義しました。

// Helper function to measure time
def time[A] (id : String)(f: => A) = {
  val start = System.nanoTime()
  val result = f
  val stop = System.nanoTime()
  println(s"""Time for "${id}": ${(stop-start)*1e-6d}ms""")
  result
}

そして、いくつかのパフォーマンス測定を行いました。

// Test code
val limit = 10000
def testRange = (1 to limit).map(BigInt(_))

time("par product")(testRange.par.product)
val timeOut = FiniteDuration(240, TimeUnit.SECONDS)
inbox.send(calcolon, Calculate[BigInt]((1 to limit).map(BigInt(_)), 0, 10, _ * _))
time("actor product")(inbox.receive(timeOut))

time("par sum")(testRange.par.sum)
inbox.send(calcolon, Calculate[BigInt](testRange, 0, 5, _ + _))
time("actor sum")(inbox.receive(timeOut))

私は次の結果を得ました

> 「標準製品」の時間: 134.38289ms
  res0: scala.math.BigInt = 284625968091705451890641321211986889014805140170279923
  079417999427441134000376444377299078675778477581588406214231752883004233994015
  351873905242116138271617481982419982759241828925978789812425312059465996259867
  065601615720360323979263287367170557419759620994797203461536981198970926112775
  004841988454104755446424421365733030767036288258035489674611170973695786036701
  910715127305872810411586405612811653853259684258259955846881464304255898366493
  170592517172042765974074461334000541940524623034368691540594040662278282483715
  120383221786446271838229238996389928272218797024593876938030946273322925705554
  596900278752822425443480211275590191694254290289169072190970836905398737474524
  833728995218023632827412170402680867692104515558405671725553720158521328290342
  799898184493136...

  「アクター製品」の時間: 1310.217247ms
  res2: 任意 = 計算結果 (28462596809170545189064132121198688901480514017027
  992307941799942744113400037644437729907867577847758158840621423175288300423399
  401535187390524211613827161748198241998275924182892597878981242531205946599625
  986706560161572036032397926328736717055741975962099479720346153698119897092611
  277500484198845410475544642442136573303076703628825803548967461117097369578603
  670191071512730587281041158640561281165385325968425825995584688146430425589836
  649317059251717204276597407446133400054194052462303436869154059404066227828248
  371512038322178644627183822923899638992827221879702459387693803094627332292570
  555459690027875282242544348021127559019169425429028916907219097083690539873747
  452483372899521802363282741217040268086769210451555840567172555372015852132829
  034279989818449...

> 「額面」の時間: 6.488620999999999ms
  res3: scala.math.BigInt = 50005000

> "actor sum" の時間: 657.752832ms
  res5: 任意 = 計算結果(50005000,0)

アクター バージョンが並列コレクションを使用するよりもはるかに遅いことが簡単にわかります。

于 2014-02-07T14:17:51.790 に答える