3

プロデューサーアクターは、すぐに処理するために別のアクターにメッセージを投稿できますか?つまり、コンシューマーメールボックスの末尾ではなく、コンシューマーメールボックスの先頭にメッセージを投稿しますか?

akkaは、自分で定義したメールボックスタイプを構成する方法を提供しますが、あるタイプのメッセージをテールではなくメールボックスの先頭に投稿する必要があるかどうかを制御する方法を提供します。例:TimerMessages。時間枠を実装するための正確なタイマー制御が必要です。メッセージは(たとえば)1000ミリ秒だけ保持する必要があり、メッセージ処理に時間がかかり、mailBoxに保留中のメッセージが多数ある場合は、タイマーメッセージを同じキューに追加したくありません。

を使用することもできますがPriorityMailBoxPriorityMailBoxメールボックスの先頭に優先度の高いメッセージ(タイマーメッセージ)を配置できても、同じ優先度のメッセージの場合、メールボックス内のメッセージの順序が同じであるとは限りません。到着の。そのため、priorityMailBoxも使用できません。

誰かが私がこの振る舞いを達成する方法を教えてもらえますか?

4

2 に答える 2

4

メッセージの到着時間を処理し、それを追加の優先度として使用できる独自のPriorityMailBoxものを使用できます(同じ「メイン」優先度のメッセージ用)。

このようなもの(テストされていません):

import akka.dispatch._
import com.typesafe.config.Config
import akka.actor.{ActorRef, PoisonPill, ActorSystem}
import java.util.Comparator
import java.util.concurrent.PriorityBlockingQueue

class MyTimedPriorityMailbox(settings: ActorSystem.Settings, config: Config)
  extends UnboundedTimedPriorityMailbox(
    TimedPriorityGenerator {
      case 'highpriority ⇒ 0

      case 'lowpriority  ⇒ 2

      case PoisonPill    ⇒ 3

      case otherwise     ⇒ 1
    })

case class TimedEnvelope(envelope: Envelope) {
  private val _timestamp = System.nanoTime()
  def timestamp = _timestamp
}

class UnboundedTimedPriorityMailbox( final val cmp: Comparator[TimedEnvelope], final val initialCapacity: Int) extends MailboxType {
  def this(cmp: Comparator[TimedEnvelope]) = this(cmp, 11)
  final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
    new PriorityBlockingQueue[TimedEnvelope](initialCapacity, cmp) with TimedQueueBasedMessageQueue with TimedUnboundedMessageQueueSemantics {
      override def queue: java.util.Queue[TimedEnvelope] = this
    }
}

trait TimedQueueBasedMessageQueue extends MessageQueue {
  def queue: java.util.Queue[TimedEnvelope]
  def numberOfMessages = queue.size
  def hasMessages = !queue.isEmpty
  def cleanUp(owner: ActorRef, deadLetters: MessageQueue) {
    if (hasMessages) {
      var envelope = dequeue()
      while (envelope ne null) {
        deadLetters.enqueue(owner, envelope)
        envelope = dequeue()
      }
    }
  }
}

trait TimedUnboundedMessageQueueSemantics extends TimedQueueBasedMessageQueue {
  def enqueue(receiver: ActorRef, handle: Envelope) { queue add TimedEnvelope(handle) }
  def dequeue(): Envelope = Option(queue.poll()).map(_.envelope).getOrElse(null)
}


object TimedPriorityGenerator {
  def apply(priorityFunction: Any ⇒ Int): TimedPriorityGenerator = new TimedPriorityGenerator {
    def gen(message: Any): Int = priorityFunction(message)
  }
}


abstract class TimedPriorityGenerator extends java.util.Comparator[TimedEnvelope] {
  def gen(message: Any): Int

  final def compare(thisMessage: TimedEnvelope, thatMessage: TimedEnvelope): Int = {
    val result = gen(thisMessage.envelope.message) - gen(thatMessage.envelope.message)
    // Int.MaxValue / Int.MinValue check omitted
    if(result == 0) (thisMessage.timestamp - thatMessage.timestamp).toInt else result
  }

}
于 2013-03-11T12:22:17.297 に答える
2

上記のコードは正常に動作します。

詳細のみ。System.getTimeNano() の使用は避けてください。CPUごとのロジックで定義されているため、マルチコアマシンでは問題があります

ここで別の投稿

次に、メッセージの順序で奇妙な動作が発生します。これは、どの cpu がエンキューするかに依存します。

従来の System.currentTimeMillis() で変更します。あまり正確ではありませんが、私たちのケースでは、同じ優先度と同じミリ秒の生成時間を持つ 2 つのメッセージが処理される順序は気にしません。

コードをありがとう!

于 2013-08-06T15:48:13.380 に答える