5

さまざまなタイプのメッセージをイベントストリームに公開する必要があります。たとえば、タイプAのメッセージが10個投稿され、タイプBのメッセージが1つ投稿され、Bの優先度がより高い場合、それらのメッセージの優先度は異なる必要があります。 Aの優先度-タイプAのメッセージがキューに10個ある場合でも、メッセージBは次のアクターによって取得される必要があります。

ここで優先メッセージについて読み、そのメールボックスの簡単な実装を作成しました。

  class PrioritizedMailbox(settings: Settings, cfg: Config) extends UnboundedPriorityMailbox(

    PriorityGenerator {
      case ServerPermanentlyDead => println("Priority:0"); 0
      case ServerDead => println("Priority:1"); 1
      case _ => println("Default priority"); 10
    }

  )

次に、application.confで構成しました

akka {

    actor {

        prio-dispatcher {
            type = "Dispatcher"
            mailbox-type = "mailbox.PrioritizedMailbox"
        }

    }

}

そして私の俳優に配線されました:

private val myActor = actors.actorOf(
  Props[MyEventHandler[T]].
    withRouter(RoundRobinRouter(HIVE)).
    withDispatcher("akka.actor.prio-dispatcher").
    withCreator(
    new Creator[Actor] {
      def create() = new MyEventHandler(storage)
    }), name = "eventHandler")

メッセージを送信するためにActorSystem.eventStream.publishを使用しており、アクターはそれにサブスクライブされています(メッセージが処理されていることをログで確認できますが、FIFO順になっています)。

ただし、ログ/コンソールで「デフォルトの優先度」のようなメッセージを見たことがないため、十分ではないようです。ここで何かが足りませんか?説明されているアプローチは、イベントストリームで機能しますか、それともアクターにメッセージを送信する直接呼び出しで機能しますか?また、eventStreamを使用して優先メッセージを取得するにはどうすればよいですか?

4

1 に答える 1

10

問題は、アクターがめちゃくちゃ速いので、メッセージがキューに入る前に処理されるため、メールボックスで優先順位を付けることができないことです。以下の例は、その要点を証明しています。

  trait Foo 
  case object X extends Foo 
  case object Y extends Foo 
  case object Z extends Foo 

  class PrioritizedMailbox(settings: ActorSystem.Settings, cfg: Config) 
extends UnboundedPriorityMailbox( 
    PriorityGenerator { 
      case X ⇒ 0 
      case Y ⇒ 1 
      case Z ⇒ 2 
      case _ ⇒ 10 
    }) 

val s = ActorSystem("prio", com.typesafe.config.ConfigFactory.parseString( 
        """ prio-dispatcher { 
        type = "Dispatcher" 
          mailbox-type = "%s" 
        }""".format(classOf[PrioritizedMailbox].getName))) 
      val latch = new java.util.concurrent.CountDownLatch(1) 
      val a = s.actorOf(Props(new akka.actor.Actor { 
        latch.await // Just wait here so that the messages are queued up 
inside the mailbox 
        def receive = { 
          case any ⇒ /*println("Processing: " + any);*/ sender ! any 
        } 
      }).withDispatcher("prio-dispatcher")) 
      implicit val sender = testActor 
      a ! "pig" 
      a ! Y 
      a ! Z 
      a ! Y 
      a ! X 
      a ! Z 
      a ! X 
      a ! "dog" 

      latch.countDown() 

      Seq(X, X, Y, Y, Z, Z, "pig", "dog") foreach { x => expectMsg(x) } 
      s.shutdown() 

このテストは飛行色で合格します

于 2012-08-31T11:41:47.803 に答える