1

私はErlang/Elixirに精通しており、プロセスのメールボックスにあるメッセージは一致するまでメールボックスに残ります:

パターンPatternは、メールボックス内の最初のメッセージと時間順に照合され、次に 2 番目のメッセージと照合されます。一致が成功し、オプションのガード シーケンスGuardSeqが true の場合、対応するBodyものが評価されます。一致するメッセージは消費されます。つまり、メールボックスから削除されますが、メールボックス内の他のメッセージは変更されません。

( http://erlang.org/doc/reference_manual/expressions.html#receive )

ただし、Akka Actors では、一致しないメッセージはメールボックスから削除されます。たとえば、食事の哲学者のシミュレーションでフォークを実装する場合、これは面倒です。

import akka.actor._

object Fork {
  def props(id: Int): Props = Props(new Fork(id))
  final case class Take(philosopher: Int)
  final case class Release(philosopher: Int)
  final case class TookFork(fork: Int)
  final case class ReleasedFork(fork: Int)
}

class Fork(val id: Int) extends Actor {
  import Fork._

  object Status extends Enumeration {
    val FREE, TAKEN = Value
  }

  private var _status: Status.Value = Status.FREE
  private var _held_by: Int = -1

  def receive = {
    case Take(philosopher) if _status == Status.FREE => {
      println(s"\tPhilosopher $philosopher takes fork $id.")
      take(philosopher)
      sender() ! TookFork(id)
      context.become(taken, false)
    }
    case Release(philosopher) if _status == Status.TAKEN && _held_by == philosopher => {
      println(s"\tPhilosopher $philosopher puts down fork $id.")
      release()
      sender() ! ReleasedFork(id)
      context.unbecome()
    }
  }

  def take(philosopher: Int) = {
    _status  = Status.TAKEN
    _held_by = philosopher
  }

  def release() = {
    _status  = Status.FREE
    _held_by = -1
  }
}

Take(<philosopher>)メッセージがフォークに送信されると、フォークが解放されてメッセージが一致するまで、メッセージがメールボックスに留まるようにします。ただし、Akka ではTake(<philosopher>)、フォークが現在行われている場合、一致がないため、メッセージはメールボックスから削除されます。

現在、この問題を解決するには、Fork アクターのメソッドをオーバーライドしunhandled、メッセージを再度 fork に転送します。

override def unhandled(message: Any): Unit = {
  self forward message
}

一致するまでメッセージをフォークに送信し続けるため、これは非常に非効率的だと思います。一致しないメッセージを継続的に転送しないで、この問題を解決する別の方法はありますか?

ここで説明されているように、最悪の場合、Erlang メールボックスを模倣するカスタム メールボックス タイプを実装する必要があると思います: http://ndpar.blogspot.com/2010/11/erlang-explained-selective-receive.html


編集: ティムのアドバイスに基づいて実装を変更し、提案どおりに Stash トレイトを使用します。私Forkのアクターは次のようになります。

class Fork(val id: Int) extends Actor with Stash {
  import Fork._

  // Fork is in "taken" state
  def taken(philosopher: Int): Receive = {
    case Release(`philosopher`) => {
      println(s"\tPhilosopher $philosopher puts down fork $id.")
      sender() ! ReleasedFork(id)
      unstashAll()
      context.unbecome()
    }
    case Take(_) => stash()
  }

  // Fork is in "free" state
  def receive = {
    case Take(philosopher) => {
      println(s"\tPhilosopher $philosopher takes fork $id.")
      sender() ! TookFork(id)
      context.become(taken(philosopher), false)
    }
  }
}

stash()ただし、 andunstashAll()呼び出しをどこにでも書きたくありません。代わりに、これを行うカスタムのメールボックス タイプを実装したいと考えています。つまり、未処理のメッセージを隠しておき、アクターによってメッセージが処理されたときにそれらをアンスタッシュします。これは可能ですか?

これを行うカスタム メールボックスを実装しようとしましたが、メッセージが受信ブロックと一致したかどうかを判断できません。

4

2 に答える 2

1

問題forwardは、処理待ちのメッセージが複数ある場合にメッセージの順序を変更する可能性があることです。これはおそらく良い考えではありません。

ここでの最善の解決策は、必要なセマンティクスを提供する独自のキューをアクター内に実装することです。メッセージをすぐに処理できない場合は、キューに入れ、次のメッセージが到着したときに、可能な限り多くのキューを処理できます。これにより、送信者が一貫性のないメッセージを送信したことを検出することもできます (たとえば、送信者が送信ReleaseしなかったフォークでTake)。

それが問題であることを証明できるまで効率について心配することはありませんが、各受信関数がその特定の状態に関連するメッセージのみを処理する場合は、より効率的になります。


varメソッドのパラメーターに状態を入れることで、アクターでの使用を避けreceiveます。また、_status値は受信ハンドラーの選択において暗黙的であり、値として保存する必要はありません。taken受信ハンドラーはReleaseメッセージのみを処理する必要があり、メインの受信ハンドラーはメッセージのみを処理する必要がありますTake

于 2018-12-19T09:55:44.690 に答える