私は Akka for Scala を初めて使用しましたが、理解できないエラー メッセージが表示されます。私のコードは基本的に進化的アルゴリズムを実行します。ある世代の間に作成された新しい個体ごとに、それを評価する必要があります。それらの個人の評価は、私が並列化したいものです (そして、長期的には、クラスターでそれを行いたいと考えています)。これを実装するために、コードを Akka Web サイトの Getting Started Tutorial に完全に向けました。コードは次のようになります。
object Configuration {
sealed trait Message
case object Broadcast extends Message
case class Work(inds:ArrayBuffer[Individual]) extends Message
case class Result(result:ArrayBuffer[Individual]) extends Message
case class NewIndividuals(inds:ArrayBuffer[Individual]) extends Message
class Worker extends Actor {
val id = (Math.random * 100).asInstanceOf[Int]
def receive = {
case Work(inds) =>
sender ! Result(geneticOperation(inds)) // perform the work
}
private def geneticOperation(inds:ArrayBuffer[Individual]) = {
for(child <- inds) {
child.mutate
child.evaluate
println(id + ": I've evaluated one.")
}
println("\n")
inds
}
}
class Master(nrOfWorkers:Int, nrOfMessages:Int, oldInds:ArrayBuffer[Individual], listener:ActorRef)
extends Actor {
var inds:ArrayBuffer[Individual] = _
var nrOfResults:Int = _
val start:Long = System.currentTimeMillis
val workerRouter = context.actorOf(
Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workerRouter")
def receive = {
case Broadcast =>
var workOn = new ArrayBuffer[Individual]
for(i <- 0 until nrOfMessages)
workOn += oldInds(i)
oldInds.trimStart(nrOfMessages)
workerRouter ! Work(workOn)
case Result(newInds) =>
inds ++= newInds
nrOfResults += 1
println(nrOfResults)
if(nrOfResults == nrOfMessages) {
listener ! NewIndividuals(inds)
context.stop(self)
}
}
}
class Listener extends Actor {
def receive = {
case NewIndividuals(inds) =>
Configuration.environment(inds)
context.system.shutdown()
}
}
var d = Parameter.N_DROPLET_TYPE-2
/** Apply genetic operators (recombination, mutation) on the population
* during the current generation.
* @param g Current generation.
*/
def apply(g:Int) {
var children = Population.recombinate // create offspring
calculate(Parameter.NODES,(Parameter.L / Parameter.NODES),children)
}
def calculate(nrOfWorkers:Int, nrOfMessages:Int, inds:ArrayBuffer[Individual]) {
val system = ActorSystem("GeneticOperations")
val listener = system.actorOf(Props[Listener], name="listener")
val master = system.actorOf(Props(new Master(
nrOfWorkers,nrOfMessages,inds,listener)),
name="master")
master ! Broadcast
}
private def environment(children:ArrayBuffer[Individual]) {
if(Parameter.NEUTRAL) { // if neutral evolution no selection pressure
Population.neutralEvolution(children)
writeDiversity
}
else Population.select(children)
writeAllToEvoPath
}
実行すると、次の出力が得られます。
ID akka
Population setup complete.
Evolution: Gen 1
Gen 2
Gen 3
Gen 4
Gen 5
Gen 6
Gen 7
Gen 8
Gen 9
107 seconds elapsed.
79: I've evaluated one.
67: I've evaluated one.
89: I've evaluated one.
56: I've evaluated one.
17: I've evaluated one.
55: I've evaluated one.
77: I've evaluated one.
15: I've evaluated one.
57: I've evaluated one.
79: I've evaluated one.
[ERROR] [05/03/2012 12:15:01.864] [GeneticOperations-akka.actor.default-dispatcher-6] [akka://GeneticOperations/user/master] null
java.lang.NullPointerException
at framework.Configuration$Master$$anonfun$receive$2.apply(Configuration.scala:75)
at framework.Configuration$Master$$anonfun$receive$2.apply(Configuration.scala:67)
at akka.actor.Actor$class.apply(Actor.scala:311)
at framework.Configuration$Master.apply(Configuration.scala:57)
at akka.actor.ActorCell.invoke(ActorCell.scala:619)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:196)
at akka.dispatch.Mailbox.run(Mailbox.scala:178)
at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:505)
at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259)
at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:974)
at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1478)
at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)
55: I've evaluated one.
[ERROR] [05/03/2012 12:15:04.339] [GeneticOperations-akka.actor.default-dispatcher-4] [akka://GeneticOperations/user/master] null
java.lang.NullPointerException
at framework.Configuration$Master$$anonfun$receive$2.apply(Configuration.scala:75)
at framework.Configuration$Master$$anonfun$receive$2.apply(Configuration.scala:67)
at akka.actor.Actor$class.apply(Actor.scala:311)
at framework.Configuration$Master.apply(Configuration.scala:57)
at akka.actor.ActorCell.invoke(ActorCell.scala:619)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:196)
at akka.dispatch.Mailbox.run(Mailbox.scala:178)
at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:505)
at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259)
at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:974)
at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1478)
at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)
(など)
世代 (gen) ごとに評価を行う必要がありますが、代わりに、私のコードはマスター アクターが結果を取得するのを待たずに処理を続けているようです。エラーメッセージは、マスターに NullPointerException があるように見えますか? しかし、なぜそれが起こるべきなのかわかりません。Akka のチュートリアルでは、マスターがワーカーからすべての結果を受け取るまで待機すると主張しています (これが私が望んでいることです)。
役立つものは何でも大歓迎です!(私は Akka のまったくの初心者であることにご注意ください。)
アップデート:
1) framework.Configuration$Master$$anonfun$receive$2.apply(Configuration.scala:75): inds ++= newInds (Master、receive、case Result 内)
2) Parameter の変数は単なる整数です。この場合、NODES = 4、L = 8 です。