4

私は経験豊富な Java プログラマーで、アクター ベースの Scala アプリケーションの開発を始めています。私が現在開発中のアプリケーションでは、自律的および反応的な動作の両方を示す Sender アクターの実装に対処する必要があります。シナリオは次のとおりです (疑似コード)。

Actor Sender{

        Active behavior (must be active once the actor boots):
            do-in-sequence{
                send to Stdout A
                send to Stdout B
                send to Stdout C
                send stop to Stdout and then exit
            }

        Reactive behavior (must be active once the actor boots):
            as soon as received stop from StopNotifier -> send stop to Stdout and then exit
    }
}

Actor Stdout{
    Purely reactive behavior (i.e. wait for msg){
        as soon as received A -> print A
        as soon as received B -> print B
        as soon as received C -> print C
        as soon as received stop from Sender -> exit
    }
}
Actor StopNotifier
    Purely active behavior {
        compute, and when some condition is met -> send stop to Sender
    }

私の質問は、自律性と反応性を統合する必要がある Scala アクターの自律的な動作を表現する最良の方法はどれですか (このペーパーで説明されているように)?

言い換えれば、上記の例で Sender アクターをコーディングする最良の方法/スタイルはどれですか?

私は解決策を思いつきました(以下に報告されています)が、私はスカラの第一人者ではないので(まだ:))、実装したものがより良い/より良い解決策で改善できるかどうか知りたいです.

case object START
case object A
case object B
case object C
case object SENT_A
case object SENT_B
case object ACK_A
case object ACK_B
case object ACK_C
case object STOP

class Sender(stdout: Stdout) extends Actor {
    def act() {
        self!START
        while (true){
            receive {
                case START =>
                    stdout!?A
                    self!SENT_A
                case SENT_A =>
                    stdout!?B
                    self!SENT_B
                case SENT_B =>
                    stdout!?C
                    stdout!?STOP
                    exit()
                case STOP => {
                    Console.println("[Sender:]Received STOP, terminating")
                    stdout!?STOP
                    exit()
                }
            }
        }
    }
}

class Stdout() extends Actor {
    def act() { 
        while (true) {
            receive{
                case A =>
                    Console.println("[Stdout:]A")
                    reply(ACK_A)
                case B =>
                    Console.println("[Stdout:]B")
                    reply(ACK_B)
              case C =>
                    Console.println("[Stdout:]C")
                    reply(ACK_C)
                    exit()
              case STOP =>
                    Console.println("[Stdout:]Received STOP, terminating")
                    exit()
            }
        }
    }
}

class StopNotifier(sender: Sender) extends Actor {
    def act() {
        /*
         * The idea is that the StopNotifier should send a STOP message to the Sender
         * when a certain condition is met.
         * The sleep used here is just a semplification, since the detection of such
         * a condition is not relevant for the example.
         */

        Thread.sleep(200)
        Console.println("[StopNotifier:]Sending STOP to sender")
        sender ! STOP
        exit()
   }
}

object app extends Application {
    val stdout = new Stdout
    stdout.start
    val sender = new Sender(stdout)
    sender.start
    val stopNotifier = new StopNotifier(sender)
    stopNotifier.start
}

特に、現在の実装で気になるのは、StopNotifier からの STOP メッセージの受信に迅速に対応できるようにするために、Sender の各実行ステップでメッセージを自己送信する必要があったという事実です(つまり、 A、B を Stdout アクターに送信します)。物事を行う正しい方法であるには、私にはトリッキーすぎるようです:)。

私は他のscala言語構造(非同期送信、反応など)を使用して他のソリューションを開発しようとしましたが、何らかの形で他の問題/トリックに影響を与えているように見えました.

scala アクターの自律性とリアクティブ動作の統合を処理するためのより良いソリューションを持っている人はいますか?

4

2 に答える 2

4

私の理解が正しければ、代わりに Akka アクター、具体的には Akka FSM を使用して、送信者をステート マシンとしてモデル化する必要があります。whenUnhandledAkka アクターには停止メカニズムが組み込まれています。または、ハンドラーを介してすべての状態から処理できる独自のメッセージをいつでも使用できます。

http://doc.akka.io/docs/akka/snapshot/scala/fsm.htmlを参照してください。

これは明らかにやり過ぎですが、もっと複雑なことをしようとしていると思います。特定のメッセージを受信したときではなく、終了したときに終了するようにStdout「監視」することもできます。Lifecycle Monitoring 別名 DeathWatchを参照してください。SenderSender

package fsmTest

import akka.actor._
import akka.util.duration._

sealed trait Msg
case object A extends Msg
case object B extends Msg
case object C extends Msg

sealed trait SenderState
case object Started extends SenderState
case object SentA extends SenderState
case object SentB extends SenderState

case class SenderData()

class Sender(stdout: ActorRef) 
  extends Actor 
  with FSM[SenderState, SenderData] {

  case object GoNextState

  startWith(Started, SenderData())

  when(Started) {
    case Event(GoNextState, data) => {
      stdout ! A
      goto(SentA) using data
    }
  }

  when(SentA) {
    case Event(GoNextState, data) => {
      stdout ! B
      goto(SentB) using data
    }
  }

  when(SentB) {
    case Event(GoNextState, data) => {
      stdout ! C
      goto(Started) using data
    }
  }

//      //Handle messages which aren't explicitly handled in state here
//      whenUnhandled {
//        case Event(SomeCustomStop, data) => {
//          stop(FSM.Shutdown)
//        }
//      }

  setTimer("goNextState", GoNextState, 1 second, repeat = true)

  initialize
}

class Stdout() extends Actor {
  def receive = {
    case msg: Msg => {
      context.watch(sender) //Not sure if you're gonna want to do this here, but you get the point
      println(msg)
    }
    case Terminated(_) => context.stop(self)
  }
}

object FSMTest extends App {

  implicit val system = ActorSystem("Testing")
  val stdout = system.actorOf(Props[Stdout], "stdout")
  val sender = system.actorOf(Props(new Sender(stdout)), "sender")

  system.scheduler.scheduleOnce(5 seconds) {
    system.stop(sender)
    system.shutdown()
  }
  system.awaitTermination(10 seconds)
}

送信者に状態を実装する方法に関係なく、アクターを使用してモデル化する場合は、イベント処理または上記のタイマーを使用して、メッセージを「自己送信」する必要があると思います。

于 2012-07-10T17:41:59.783 に答える
1

1 つの反応または受信ブロック内の 1 つのアクターから順番に送信されたメッセージは、その順番で受信されます。(他のアクターからの他のメッセージが散在している可能性がありますが、A の次に B を送信し、B の次に A を取得することはありません。)

だからあなたはただ

stdout ! A
stdout ! B
stdout ! C

他にも何かをする必要がない限り。

于 2012-07-10T18:52:35.577 に答える