73

I am currently trying to get started with Akka and I am facing a weird problem. I've got the following code for my Actor:

class AkkaWorkerFT extends Actor {
  def receive = {
    case Work(n, c) if n < 0 => throw new Exception("Negative number")
    case Work(n, c) => self reply n.isProbablePrime(c);
  }
}

And this is how I start my workers:

val workers = Vector.fill(nrOfWorkers)(actorOf[AkkaWorkerFT].start());
val router = Routing.loadBalancerActor(SmallestMailboxFirstIterator(workers)).start()

And this is how I shut everything down:

futures.foreach( _.await )
router ! Broadcast(PoisonPill)
router ! PoisonPill

Now what happens is if I send the workers messages with n > 0 (no exception is thrown), everything works fine and the application shuts down properly. However, as soon as I send it a single message which results in an exception, the application does not terminate because there is still an actor running, but I can't figure out where it comes from.

In case it helps, this is the stack of the thread in question:

  Thread [akka:event-driven:dispatcher:event:handler-6] (Suspended) 
    Unsafe.park(boolean, long) line: not available [native method]  
    LockSupport.park(Object) line: 158  
    AbstractQueuedSynchronizer$ConditionObject.await() line: 1987   
    LinkedBlockingQueue<E>.take() line: 399 
    ThreadPoolExecutor.getTask() line: 947  
    ThreadPoolExecutor$Worker.run() line: 907   
    MonitorableThread(Thread).run() line: 680   
    MonitorableThread.run() line: 182   

PS: The thread which is not terminating isn't any of the worker threads, because I've added a postStop callback, every one of them stops properly.

PPS: Actors.registry.shutdownAll workarounds the problem, but I think shutdownAll should only be used as a last resort, shouldn't it?

4

3 に答える 3

23

akka アクター内の問題を処理する適切な方法は、例外をスローするのではなく、スーパーバイザー階層を設定することです

「並行コードで例外をスローすると (リンクされていないアクターを使用していると仮定しましょう)、現在アクターを実行しているスレッドが単純に破壊されます。

問題が発生したことを確認する方法はありません (スタック トレースを検査する以外に)。それについてあなたができることは何もありません。」

スーパーバイザー階層による耐障害性 (1.2)を参照してください。

* 注意 *上記は Akka の古いバージョン (1.2) に当てはまります。例えば

class Child extends Actor {
    var state = 0
    def receive = {
      case ex: Exception ⇒ throw ex
      case x: Int        ⇒ state = x
      case "get"         ⇒ sender ! state
    }
  }

そしてスーパーバイザーで:

class Supervisor extends Actor {
    import akka.actor.OneForOneStrategy
    import akka.actor.SupervisorStrategy._
    import scala.concurrent.duration._

    override val supervisorStrategy =
      OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
        case _: ArithmeticException      ⇒ Resume
        case _: NullPointerException     ⇒ Restart
        case _: IllegalArgumentException ⇒ Stop
        case _: Exception                ⇒ Escalate
      }

    def receive = {
      case p: Props ⇒ sender ! context.actorOf(p)
    }
  }

スーパーバイザー階層による耐障害性 (2.2)を参照してください。

于 2011-11-11T18:34:30.417 に答える
2

Viktorによって提案されたように、ログをオフにして物事が確実に終了するようにするのは少し奇妙です。代わりにできることは次のとおりです。

EventHandler.shutdown()

これにより、例外の後にワールドを実行し続けるすべての(ロガー)リスナーがクリーンにシャットダウンされます。

def shutdown() {
  foreachListener(_.stop())
  EventHandlerDispatcher.shutdown()
}
于 2012-01-05T19:36:25.067 に答える
-3

ロガーの電源を入れますakka.conf

于 2011-11-09T09:17:11.557 に答える