5

Scala 2.8 では、アクターを起動すると、メッセージ パッシングを介して通信できます。これは、最終的な Exit() メッセージ、または自分のプロトコルに合うと判断したものを送信できることを意味します。

しかし、アクターが終了したかどうかを確認するにはどうすればよいでしょうか? マスター アクターがいくつかのワーカー アクターを開始し、それが最終的な回答であるかどうか (つまり、アクターがまだ動作しているか、またはすべて終了したか) を確認するたびに、単純に回答を待つというタスクを自分が持っていることは容易に想像できます。

もちろん、「完了しました」というメッセージを全員に送り返してからカウントすることもできますが、これではどうにもなりません。

worker-actor の完了をテストする際のベスト プラクティスは何ですか?

編集#1

Futures を検討していますが、問題があります。誰かがこのコードが機能しない理由を説明できますか:

package test
import scala.actors.Futures._

object FibFut extends Application{
    
    def fib(i:Int):Int = 
        if(i<2)
            1
        else
            fib(i-1)+fib(i-2)
            
    val f = future{ fib(3) }
    
    println(f())    
        
}

関数 fib を future-body 内で定義すると機能します。それはスコープの問題であるに違いありませんが、上記でエラーは発生せず、単にハングします。誰?

編集#2

Application を拡張するのは良い方法ではなかったようです。main メソッドを定義すると、すべてが機能しました。以下のコードは私が探していたものなので、Futuresは高く評価します :)

package test

import scala.actors.Futures._

object FibFut {

  def fib(i: Int): Int = if (i < 2) 1 else fib(i - 1) + fib(i - 2)

  def main(args: Array[String]) {

    val fibs = for (i <- 0 to 50) yield future { fib(i) }

    for (future <- fibs) println(future())

  }

}
4

3 に答える 3

3

私は個人的に「完了」メッセージのファンです。これは、作業の配分を管理するための優れた方法であり、おまけとして、すべての子供がいつ自分の作業を終了したかが既にわかっています。

しかし、実際にいくつかの作業を一度ファームアウトして、すべての準備が整うまで待ちたい場合は、 をチェックしてくださいscala.actors.Futures。いくつかの計算を行うように依頼できます。

val futureA = Futures.future {
  val a = veryExpensiveOperation
  (a,"I'm from the future!")
}

複数のリクエストを行った場合は、すべてが完了するまで待つことができます。

Futures.awaitAll(600*1000, futureA, futureB, futureC, futureD)
// Returns once all of A-D have been computed
val actualA = futureA()   // Now we get the value
于 2010-11-28T20:19:55.297 に答える
2

少し前に、Scala でのアクターのリンクに関する投稿を書きました。アクターのリンクは、Erlang、Scala アクター、およびその他のアクター ライブラリでアクターを監視する慣用的な (そして最も簡単な) 方法です。デフォルトでは、2 つのアクターをリンクし、そのうちの 1 つが死ぬと、別のアクターもすぐに死にます (アクターが終了シグナルをトラップ/処理しない限り):

scala> case object Stop
defined module Stop

scala>

scala> val actor1 = actor {
     |    loop {
     |       react {
     |          case Stop =>
     |             println("Actor 1: stop")
     |             exit()
     |          case msg => println(msg)
     |             }
     |         }
     | }
actor1: scala.actors.Actor = scala.actors.Actor$$anon$1@1feea62

scala>

scala> val actor2 = actor {
     |    link(actor1)
     |    self.trapExit = true
     |    loop {
     |       react {
     |          case msg => println(msg)
     |             }
     |         }
     | }
actor2: scala.actors.Actor = scala.actors.Actor$$anon$1@1e1c66a

scala> actor1.start
res12: scala.actors.Actor = scala.actors.Actor$$anon$1@1feea62

scala> actor2.start
res13: scala.actors.Actor = scala.actors.Actor$$anon$1@1e1c66a

scala> actor1 ! Stop
Actor 1: stop

scala> Exit(scala.actors.Actor$$anon$1@1feea62,'normal)  // Actor 2 received message, when Actor1 died

より洗練された柔軟な方法は、スーパーバイザーを使用することです (Erlang のスーパーバイザーの動作、Akka アクター ライブラリのアクター スーパーバイザーなど) スーパーバイザー (それ自体がアクターである) は、他の多くのアクターを監視し、指定された戦略に従ってそれらを再起動します (1 つのアクターが死亡した場合はすべてのアクターを再起動し、死亡した場合は 1 つのアクターのみを再起動します)。

于 2010-11-28T17:22:41.080 に答える
0

皆さん、私はアクター クラスの getState 関数を使用して解決策を考え出しました。ソリューションでは、このスレッドのアイデアを使用しました: ReactWithin(0) が使用されている Scala Actor's Mailbox を覗くための最良の方法。反応とループを使用すると、プログラムが大きな計算でブロックされるだけで問題が発生しました。これは、loop を while(true) に、reactWithin(int) を receiveWithin(int) に置き換えることで解決されました。

私の解決策は次のようになります(注意してください、bigass code-lump):

package test

import scala.actors._
import scala.actors.Actor.State._

case class Computation(index: Int, a: () ⇒ Int)
case class Result(i: String)
object Main {
  def main(args: Array[String]) {
    val m = new Master
    m.start
  }
}

class Master extends Actor {

  val N = 40
  var numberOfAnswers = 0

  def fib(x: Int): Int =
    if (x < 2)
      1
    else
      fib(x - 1) + fib(x - 2)

  val computers = for (i ← 0 to N) yield new Computer

  def act {

    for (i ← 0 until computers.size) {
      computers(i).start
      computers(i) ! Computation(i, () => fib(i))
    }

    println("done Initializing actors")
    while (true) {
      receiveWithin(1000) {

        case Result(i) =>
          val numberDone = computers.map(_.getState == Terminated).filter(_ == true).length
          println(i)
          numberOfAnswers += 1

        case TIMEOUT =>
          val allDone = computers.map(_.getState == Terminated).reduceRight(_ && _)
          println("All workers done?:" + allDone)
          println("# of answers:" + numberOfAnswers)
          if (allDone)
            exit()
      }
    }

  }

}

class Computer extends Actor {

  def act {
    loop {
      react {
        case Computation(i, f) ⇒
          sender ! Result("#" + i + " Res:" + f())
          exit()
      }
    }
  }

}

プログラムはフィボナッチ数を (最悪の方法で) 計算します。アイデアは、大きなワークロードに対して複数のスレッドの使用率をテストすることです。次の行は、一部のアクターがまだ終了していないかどうかを確認します。

computers.map(_.getState == Terminated).reduceRight(_ && _)

ここで、コンピューターは IndexedSeq[Computer] 型です。トリックは、TIMEOUT メッセージを使用して、すべての作業が完了したかどうかを定期的に確認し、それに応じて行動できることです (この場合、アクティブなワーカーがなくなったら終了します)。各ワーカーが終了する前に結果を送信するという事実を利用しています。そうすれば、結果が終了済みとして表示される前に、常に結果を受け取り、処理できることがわかります。

while(true) と receive の代わりに react と loop を使用すると、プログラムが「ロックアップ」(メッセージの受信を停止) するという事実について誰かコメントできますか?

于 2010-11-29T14:30:33.177 に答える