2

私にはしつこい俳優がいます。初めて起動するとき (データベースが空)、初期データを保持します。しかし、期待どおりに状態が更新されません。最初のメッセージが処理された後にのみ更新されます。状態の更新後にアクターにメッセージの処理を開始させるにはどうすればよいですか?

アクターコード

class TestActor extends PersistentActor {
  var numberOfEvents = 0

  def updateState(e: Any): Unit = {
    println("updating")
    numberOfEvents += 1
  }

  override def receiveRecover: Receive = {
    case RecoveryCompleted =>
      if (numberOfEvents == 0) {
        println("persisting")
        persist("foo")(updateState)
      }
  }

  override def receiveCommand: Receive = {
    case _ => {
      println("answering")
      sender ! numberOfEvents
    }
  }
}

テストコード

Await.result(actorRef ? "stats", Duration.Inf) shouldBe 0 // I wan't 1 here
Await.result(actorRef ? "stats", Duration.Inf) shouldBe 1

出力

persisting
answering // why this goes before updating?
updating
answering

完全なコード

4

1 に答える 1

1

再考したいことの 1 つは、通常、RecoveryCompleted イベントで状態を更新するのではなく、状態を再構成するために永続化したイベントを処理することです。RecoveryCompleted メッセージは、回復の最後に何をすべきかを処理するためのものです。これらのイベントは、永続化したジャーナルから再生されたイベントになります。スナップショットを使用している場合は、必要に応じてスナップショット イベントも取得します。

例えば:

override def receiveRecover: Receive = {
    case Added(num) =>
        updateState(num) 

    case SnapshotOffer(metadata, snapshot) ⇒
       // Restore your full state from the data in the snapshot

    case RecoveryCompleted =>
        println("Recovery completed") // use logger here
  }

一方、 receiveCommand は、着信コマンドを処理し、それらのイベントを永続化し、それらのイベントが更新された後に内部状態を更新するために使用されます。

override def receiveCommand: Receive = {
    case Add(num) => {
      println("received event and persisting")

      persist(Added(num){ evt ⇒ 
        // This gets called after the persist succeeds
        updateState(num)
        sender ! numberOfEvents
      }      
    }
  }

def updateState(e: Int): Unit = {
    println("updating")
    numberOfEvents += e
  }

メッセージングに関しては、以下に示すように、コマンドの過去形としてイベントに名前を付けると便利です。

// イベント

ケースクラスが追加されました(v:Int)

// コマンド

ケースクラス Add(v:Int)

うまくいけば、これで少しわかりやすくなります。

于 2016-04-16T11:54:41.827 に答える