2

次のトップレベル (「最も親」) なアクターがあります。

// Groovy pseudo-code
class Master extends UntypedActor {
    ActorRef child1
    ActorRef child2
    ActorRef child3
    ActorRef backup

    @Override
    void onReceive(Object message) throws Exception {
        if(message instanceof Terminated) {
            Terminated terminated = message as Terminated
            if(terminated.actor != backup) {
                terminated.actor = backup
            } else {
                // TODO: What to do here? How to escalate from here?
            }
        } else {
            child1.tell(new DoSomething(message), getSelf())
            child2.tell(new DoSomethingElse(message), getSelf())
            child3.tell(new DoSomethingElser(message, getSelf())
        }
    }

    @Override
    SupervisorStrategy supervisorStrategy() {
        new OneForOneStrategy(10, Duration.minutes(“1 minute”, new Future<Throwable, SupervisorStrategy.Directive> {
            @Override
            Directive apply(Throwable t) throws Exception {
                if(isRecoverable(t) {   // Don’t worry about how/where this is defined or how it works
                    SupervisorStrategy.stop()
                } else {
                    SupervisorStrategy.escalate()
                }
            }
        })
    }
}

ご覧のとおり、3 つの子を監視し、それら 3 つの子が「回復可能な」例外をスローすると、それらは停止され、バックアップに置き換えられます。ここまでは順調ですね。

私が今直面している問題は、バックアップ アクターがスロー可能なものをスローした場合、このMasterアクター (そして実際には、私のアプリ全般) が入力の処理を続行できない状態にあると見なし、エスカレートすることです。保護者レベルの例外。

私は Akka を初めて使用するので、このコードをどこに配置すればよいか、どのように見えるべきかわかりません。繰り返しますが、次のようなロジックが必要です。

  • Masterバックアップ アクターがスロー可能なものをスローした場合、例外をの親にエスカレートします。これは、実際には Akka “<a href="http://doc.akka.io/docs/akka/snapshot/general/supervision.html " rel="nofollow">guardian" アクター/コンストラクト

これの最初の部分は、例外がバックアップからいつスローされるかを知る必要があるということです。この部分は処理できるので、戦略が次のようになったとしましょう。

@Override
SupervisorStrategy supervisorStrategy() {
    new OneForOneStrategy(10, Duration.minutes(“1 minute”, new Future<Throwable, SupervisorStrategy.Directive> {
        @Override
        Directive apply(Throwable t) throws Exception {
            if(wasThrownFromBackup(t)) {
                SupervisorStrategy.escalate()
            } else if(isRecoverable(t) {
                SupervisorStrategy.stop()
            } else {
                SupervisorStrategy.escalate()
            }
        }
    })
}

しかし、ご覧のとおり、「アクター システムから」エスカレーションを実装するのにまだ苦労しています。アイデア?Scala は私には象形文字のように見えるので、Java コードの例が非常に好まれます。

4

1 に答える 1

2

ここで 'Reaper' パターンを見てください http://letitcrash.com/post/30165507578/shutdown-patterns-in-akka-2申し訳ありませんが、これは Scala にありますが、Java に変換するのは簡単だと思います。

こちらもご覧ください https://groups.google.com/forum/#!topic/akka-user/QG_DL7FszMU

構成で設定する必要があります

akka.actor.guardian-supervisor-strategy = "akka.actor.StoppingSupervisorStrategy"

これにより、エスカレートする「トップレベル」のアクターがシステムによって停止されます。次に、ジョブが 1 つだけの「Reaper」(または任意の名前) と呼ばれる別のトップ レベル アクターを実装し、メインのトップ レベル アクターを監視してcontext.system.shutdown()、トップ レベル アクターが停止したときにアクション (例: ) を実行します。

私は akka Java API を知らないので、正確な例を提供することはできませんが、Scala では、上記の LetItCrash ブログから、次のようになります。

import akka.actor.{Actor, ActorRef, Terminated}
import scala.collection.mutable.ArrayBuffer

object Reaper {
  // Used by others to register an Actor for watching
  case class WatchMe(ref: ActorRef)
}

abstract class Reaper extends Actor {
  import Reaper._

  // Keep track of what we're watching
  val watched = ArrayBuffer.empty[ActorRef]

  // Derivations need to implement this method.  It's the
  // hook that's called when everything's dead
  def allSoulsReaped(): Unit

  // Watch and check for termination
  final def receive = {
    case WatchMe(ref) =>
      context.watch(ref)
      watched += ref
    case Terminated(ref) =>
      watched -= ref
      if (watched.isEmpty) allSoulsReaped()
  }
}

class ProductionReaper extends Reaper {
  // Shutdown
  def allSoulsReaped(): Unit = context.system.shutdown()
}

アプリケーションの起動時に、マスター アクターを作成し、リーパーを作成して、リーパーにWatchMe(masterActor)メッセージを送信します。

于 2015-07-28T08:05:55.860 に答える