8

akkaActorsでフォールトトレラントな動作を取得しようとしています。私は、システム内のアクターが長時間の処理に使用できることに依存するいくつかのコードに取り組んでいます。処理が数時間後に停止し(約10時間かかるはずです)、あまり発生していないことがわかりました。私のアクターは例外から回復していないと思います。

アクターを1対1で永続的に再起動するには、何をする必要がありますか?これは、このドキュメントhttp://akka.io/docs/akka/1.1.3/scala/fault-toleranceから実行できると思います。

私はakka1.1.3とscala2.9を使用しています

import akka.actor.Actor
import akka.actor.Actor._
import akka.actor.ActorRef
import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached
import akka.dispatch.Dispatchers
import akka.routing.CyclicIterator
import akka.routing.LoadBalancer
import akka.config.Supervision._


object TestActor {
  val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pool")
                   .setCorePoolSize(100)
                   .setMaxPoolSize(100)
                   .build
}

class TestActor(val name: Integer) extends Actor {
    self.lifeCycle = Permanent
    self.dispatcher = TestActor.dispatcher
    def receive = {
       case num: Integer => {  
         if( num % 2 == 0 )
           throw new Exception("This is a simulated failure")
         println("Actor: " + name + " Received: " + num)
         //Thread.sleep(100)
       }
    }

  override def postStop(){
    println("TestActor post Stop ")
  }

  //callback method for restart handling 
  override def preRestart(reason: Throwable){
    println("TestActor "+ name + " restaring after shutdown because of " + reason)
  }

  //callback method for restart handling 
  override def postRestart(reason: Throwable){
    println("Restaring TestActor "+name+"after shutdown because of " + reason)
  }  
}

trait CyclicLoadBalancing extends LoadBalancer { this: Actor =>
    val testActors: List[ActorRef]
    val seq = new CyclicIterator[ActorRef](testActors)
}

trait TestActorManager extends Actor {
     self.lifeCycle = Permanent
     self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 1000, 5000)
     val testActors: List[ActorRef]
     override def preStart = testActors foreach { self.startLink(_) }
     override def postStop = { System.out.println("postStop") }
}


  object FaultTest {
    def main(args : Array[String]) : Unit = {
      println("starting FaultTest.main()")
      val numOfActors = 5
      val supervisor = actorOf(
        new TestActorManager with CyclicLoadBalancing {
             val testActors = (0 until numOfActors toList) map (i => actorOf(new TestActor(i)));
        }
      )

      supervisor.start();

      println("Number of Actors: " +  Actor.registry.actorsFor(classOf[TestActor]).length)

      val testActor = Actor.registry.actorsFor(classOf[TestActor]).head

      (1 until 200 toList) foreach { testActor ! _ }

    }
  }

このコードは、LoadBalancerの背後に5つのアクターを設定します。これらのアクターは、障害をシミュレートするために偶数に例外をスローすることを除いて、送信される整数を出力します。整数0から200がこれらのアクターに送信されます。奇数が出力されることを期待していますが、偶数でいくつかの障害が発生すると、すべてがシャットダウンするようです。このコードをsbtで実行すると、次の出力になります。

[info] Running FaultTest 
starting FaultTest.main()
Loading config [akka.conf] from the application classpath.
Number of Actors: 5
Actor: 2 Received: 1
Actor: 2 Received: 9
Actor: 1 Received: 3
Actor: 3 Received: 7
[info] == run ==
[success] Successful.
[info] 
[info] Total time: 13 s, completed Aug 16, 2011 11:00:23 AM

ここで起こっていると思うのは、5人の俳優が始まり、最初の5人の偶数が彼らを廃業させ、彼らは再開されていないということです。

アクターが例外から回復するように、このコードをどのように変更できますか?

これにより、実際には1から200までのすべての奇数が出力されると思います。各アクターは偶数では失敗しますが、例外ではそのままのメールボックスで再開されると思います。preRestartとpostRestartからprintlnが表示されることを期待しています。これらのことを実現するには、このコードサンプルで何を構成する必要がありますか?

ここに、私の誤解につながる可能性のあるakkaとActorsに関するいくつかの追加の仮定があります。アクターは、受信中に例外がスローされたときに再起動され、引き続き使用できるように、スーパーバイザーまたはfaultHandlerで構成できると想定しています。アクターに送信されたメッセージは、受信中に例外をスローすると失われると想定しています。例外をスローするアクターのpreRestart()とpostRestart()が呼び出されると想定しています。

コード例は、私がやろうとしていることを表しており、Akkaでアクターへのディスパッチが縮小されている理由に基づいています。

**別のコードサンプル**

これは、より単純な別のコードサンプルです。私は偶数に例外を投げる1人の俳優を始めています。邪魔になるロードバランサーやその他のものはありません。俳優に関する情報を印刷しようとしています。メッセージがアクターに送信され、何が起こっているかを監視した後、1分間プログラムを終了するのを待っています。

これで奇数が出力されると思いますが、アクターがメールボックスにメッセージを入れて座っているように見えます。

OneForOneStrategyの設定を間違えていますか?アクターを何かにリンクする必要がありますか?この種の構成は、私の側で根本的に誤った方向に向けられていますか?ディスパッチャは、何らかの方法でフォールトトレランスを設定する必要がありますか?ディスパッチャのスレッドを台無しにすることはできますか?

import akka.actor.Actor
import akka.actor.Actor._
import akka.actor.ActorRef
import akka.actor.ActorRegistry
import akka.config.Supervision._

class SingleActor(val name: Integer) extends Actor {
    self.lifeCycle = Permanent
    self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 30, 1000)
    def receive = {
       case num: Integer => {  
         if( num % 2 == 0 )
            throw new Exception("This is a simulated failure, where does this get logged?")
         println("Actor: " + name + " Received: " + num)
       }
    }

  override def postStop(){
    println("TestActor post Stop ")
  }

  override def preRestart(reason: Throwable){
    println("TestActor "+ name + " restaring after shutdown because of " + reason)
  }

  override def postRestart(reason: Throwable){
    println("Restaring TestActor "+name+"after shutdown because of " + reason)
  }  
}

object TestSingleActor{

    def main(args : Array[String]) : Unit = {
      println("starting TestSingleActor.main()")

      val testActor = Actor.actorOf( new SingleActor(1) ).start()

      println("number of actors: " + registry.actors.size)
      printAllActorsInfo

      (1 until 20 toList) foreach { testActor ! _ }

      for( i <- 1 until 120 ){
        Thread.sleep(500)
        printAllActorsInfo
      }
    }

  def printAllActorsInfo() ={
    registry.actors.foreach( (a) =>
       println("Actor hash: %d has mailbox %d isRunning: %b isShutdown: %b isBeingRestarted: %b "
               .format(a.hashCode(),a.mailboxSize,a.isRunning,a.isShutdown,a.isBeingRestarted)))
  }
}

次のような出力が得られます:

[info] Running TestSingleActor 
starting TestSingleActor.main()
Loading config [akka.conf] from the application classpath.
number of actors: 1
Actor hash: -1537745664 has mailbox 0 isRunning: true isShutdown: false isBeingRestarted: false 
Actor: 1 Received: 1
Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false 

... 117 more of these lines repeted ...

Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false 
[info] == run ==
[success] Successful.
[info] 
[info] Total time: 70 s, completed Aug 17, 2011 2:24:49 PM
4

2 に答える 2

5

問題は、私がakka.confファイルを使用していたことでした。イベントハンドラーを構成した行を除いて、参照1.1.3akka.confファイルを使用していました。

私の(壊れたもの):

    event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] 

リファレンス1.1.3(機能するもの):

    event-handlers = ["akka.event.EventHandler$DefaultListener"]

私のevent-handlers設定行では、Actorの再起動は発生しません。リファレンス1.1.3を使用すると、回線の再起動が見事に行われます。

これらの手順に基づいてこの変更を行いましたhttp://akka.io/docs/akka/1.1.3/general/slf4j.html

したがって、そのページの提案を取り除き、1.1.3リファレンスakka.confに戻ることで、フォールトトレラントなアクターを取得することができました。

于 2011-08-18T17:55:58.340 に答える
1

メッセージが送信された後に問題が終了し、非同期アプリケーションを存続させようとしていないため、メインスレッドが終了し、すべてが停止すると思います。

于 2011-08-17T15:20:01.153 に答える