12

Akka のドキュメントで、囲んでいるアクターから変数を閉じるのは危険であると読みました。

警告

この場合、包含アクターの参照を閉じないように注意する必要があります。つまり、匿名の Actor クラス内から包含アクターのメソッドを呼び出さないでください。これにより、アクターのカプセル化が壊れ、同期バグや競合状態が発生する可能性があります。これは、他のアクターのコードが同封のアクターに対して同時にスケジュールされるためです。

現在、2 つのアクターがあり、そのうちの 1 つは 2 番目のアクターに何かを要求し、その結果に対して何かを行います。私がまとめた以下の例では、アクターAccumulatorがアクターNumberGeneratorから数値を取得してそれらを合計し、途中で合計を報告しています。

これは、この例が 2 つの異なる受信関数 ( A vs B )で示しているように、少なくとも 2 つの異なる方法で実行できます。2 つの違いは、Aがカウンター変数を閉じないことです。代わりに、整数を待って合計し、Bはカウンターを超えてクローズして合計を行うFutureを作成します。これがどのように機能するかを正しく理解していれば、これは onSuccess を処理するためだけに作成された匿名のアクター内で発生します。

import com.esotericsoftware.minlog.Log

import akka.actor.{Actor, Props}
import akka.pattern.{ask, pipe}
import akka.util.Timeout
import akka.util.duration._

case object Start
case object Request


object ActorTest {
  var wake = 0

  val accRef = Main.actorSystem.actorOf(Props[Accumulator], name = "accumulator")
  val genRef = Main.actorSystem.actorOf(Props[NumberGenerator], name = "generator")

  Log.info("ActorTest", "Starting !")

  accRef ! Start
}

class Accumulator extends Actor {
  var counter = 0

  implicit val timeout = Timeout(5 seconds)

  // A: WITHOUT CLOSURE
  def receive = {
    case Start => ask(ActorTest.genRef, Request).mapTo[Int] pipeTo self
    case x: Int => counter += x; Log.info("Accumulator", "counter = " + counter); self ! Start
  }
  // B: WITH CLOSURE
  def receive = {
    case Start => ask(ActorTest.genRef, Request).mapTo[Int] onSuccess {
      case x: Int => counter += x; Log.info("Accumulator", "counter = " + counter); self ! Start
    }
  }
}

class NumberGenerator extends Actor {
  val rand = new java.util.Random()

  def receive = {
    case Request => sender ! rand.nextInt(11)-5
  }
}

この場合、クロージャーを使用するのは絶対に悪いことですか? もちろん、Int の代わりに AtomicInteger を使用することも、たとえば netty を使用するネットワーク シナリオで、スレッドセーフチャネルで書き込み操作を発行することもできますが、これはここでの私のポイントではありません。

ばかげたことを尋ねるリスクがあります: フューチャーの onSuccess が、 receive関数でケースを定義せずに、匿名の中間アクターの代わりにこのアクターで実行する方法はありますか?

編集

より明確に言えば、私の質問は次のとおりです。特定の Actor と同じスレッドで一連の Future を強制的に実行する方法はありますか?

4

2 に答える 2

5

このような設計を実装する最も簡単な方法は、「ファイア アンド フォーゲット」セマンティックを使用することです。

class Accumulator extends Actor {
  private[this] var counter = 0

  def receive = {
    case Start => ActorTest.genRef ! Request
    case x: Int => {
      counter += x
      Log.info("Accumulator", "counter = " + counter)
      self ! Start
    }
  }
}

このソリューションは完全に非同期であり、タイムアウトは必要ありません。

于 2012-06-21T05:31:56.180 に答える
5

問題はonSuccess、アクターが実行されるスレッドとは異なるスレッドで がreceive実行されることです。このpipeToアプローチを使用するか、Agentを使用できます。を作成すれば問題は解決counterAtomicIntegerますが、それほどクリーンではありません。つまり、アクター モデルが壊れてしまいます。

于 2012-06-21T02:04:13.097 に答える