1

私は Akka for Scala を初めて使用しましたが、理解できないエラー メッセージが表示されます。私のコードは基本的に進化的アルゴリズムを実行します。ある世代の間に作成された新しい個体ごとに、それを評価する必要があります。それらの個人の評価は、私が並列化したいものです (そして、長期的には、クラスターでそれを行いたいと考えています)。これを実装するために、コードを Akka Web サイトの Getting Started Tutorial に完全に向けました。コードは次のようになります。

object Configuration {

sealed trait Message
case object Broadcast extends Message
case class Work(inds:ArrayBuffer[Individual]) extends Message
case class Result(result:ArrayBuffer[Individual]) extends Message
case class NewIndividuals(inds:ArrayBuffer[Individual]) extends Message

class Worker extends Actor {

    val id = (Math.random * 100).asInstanceOf[Int]

    def receive = {
        case Work(inds) =>
            sender ! Result(geneticOperation(inds)) // perform the work
    }

    private def geneticOperation(inds:ArrayBuffer[Individual]) = {
        for(child <- inds) {
            child.mutate
            child.evaluate
            println(id + ": I've evaluated one.")
        }
        println("\n")
        inds
    }

}

class Master(nrOfWorkers:Int, nrOfMessages:Int, oldInds:ArrayBuffer[Individual], listener:ActorRef)
    extends Actor {

    var inds:ArrayBuffer[Individual] = _ 
    var nrOfResults:Int = _
    val start:Long = System.currentTimeMillis

    val workerRouter = context.actorOf(
            Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workerRouter")

    def receive = {
        case Broadcast =>
            var workOn = new ArrayBuffer[Individual]    
            for(i <- 0 until nrOfMessages)
                workOn += oldInds(i)
            oldInds.trimStart(nrOfMessages)
            workerRouter ! Work(workOn)
        case Result(newInds) =>
            inds ++= newInds
            nrOfResults += 1
            println(nrOfResults)
            if(nrOfResults == nrOfMessages) {
                listener ! NewIndividuals(inds)
                context.stop(self)
            }
    }

}

class Listener extends Actor {
    def receive = {
        case NewIndividuals(inds) =>
            Configuration.environment(inds)
            context.system.shutdown()
    }
}

var d = Parameter.N_DROPLET_TYPE-2

  /** Apply genetic operators (recombination, mutation) on the population 
   * during the current generation.
   *  @param g Current generation.
   */
def apply(g:Int) {
    var children = Population.recombinate   // create offspring
    calculate(Parameter.NODES,(Parameter.L / Parameter.NODES),children)
}

def calculate(nrOfWorkers:Int, nrOfMessages:Int, inds:ArrayBuffer[Individual]) {
    val system = ActorSystem("GeneticOperations")
    val listener = system.actorOf(Props[Listener], name="listener")
    val master = system.actorOf(Props(new Master(
            nrOfWorkers,nrOfMessages,inds,listener)),
            name="master")
    master ! Broadcast
}

    private def environment(children:ArrayBuffer[Individual]) {
      if(Parameter.NEUTRAL) {       // if neutral evolution no selection pressure
          Population.neutralEvolution(children)
          writeDiversity
      }
      else Population.select(children)
      writeAllToEvoPath
}

実行すると、次の出力が得られます。

ID akka
Population setup complete.
Evolution: Gen 1
Gen 2
Gen 3
Gen 4
Gen 5
Gen 6
Gen 7
Gen 8
Gen 9


107 seconds elapsed.
79: I've evaluated one.
67: I've evaluated one.
89: I've evaluated one.
56: I've evaluated one.
17: I've evaluated one.
55: I've evaluated one.
77: I've evaluated one.
15: I've evaluated one.
57: I've evaluated one.
79: I've evaluated one.


[ERROR] [05/03/2012 12:15:01.864] [GeneticOperations-akka.actor.default-dispatcher-6] [akka://GeneticOperations/user/master] null
java.lang.NullPointerException
at framework.Configuration$Master$$anonfun$receive$2.apply(Configuration.scala:75)
at framework.Configuration$Master$$anonfun$receive$2.apply(Configuration.scala:67)
at akka.actor.Actor$class.apply(Actor.scala:311)
at framework.Configuration$Master.apply(Configuration.scala:57)
at akka.actor.ActorCell.invoke(ActorCell.scala:619)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:196)
at akka.dispatch.Mailbox.run(Mailbox.scala:178)
at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:505)
at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259)
at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:974)
at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1478)
at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)

55: I've evaluated one.


[ERROR] [05/03/2012 12:15:04.339] [GeneticOperations-akka.actor.default-dispatcher-4] [akka://GeneticOperations/user/master] null
java.lang.NullPointerException
at framework.Configuration$Master$$anonfun$receive$2.apply(Configuration.scala:75)
at framework.Configuration$Master$$anonfun$receive$2.apply(Configuration.scala:67)
at akka.actor.Actor$class.apply(Actor.scala:311)
at framework.Configuration$Master.apply(Configuration.scala:57)
at akka.actor.ActorCell.invoke(ActorCell.scala:619)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:196)
at akka.dispatch.Mailbox.run(Mailbox.scala:178)
at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:505)
at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259)
at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:974)
at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1478)
at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)

(など)

世代 (gen) ごとに評価を行う必要がありますが、代わりに、私のコードはマスター アクターが結果を取得するのを待たずに処理を続けているようです。エラーメッセージは、マスターに NullPointerException があるように見えますか? しかし、なぜそれが起こるべきなのかわかりません。Akka のチュートリアルでは、マスターがワーカーからすべての結果を受け取るまで待機すると主張しています (これが私が望んでいることです)。

役立つものは何でも大歓迎です!(私は Akka のまったくの初心者であることにご注意ください。)

アップデート:

1) framework.Configuration$Master$$anonfun$receive$2.apply(Configuration.scala:75): inds ++= newInds (Master、receive、case Result 内)

2) Parameter の変数は単なる整数です。この場合、NODES = 4、L = 8 です。

4

0 に答える 0