14

メッセージを処理して特定のエンティティを作成するアクターが Akka にいます。これらのエンティティの一部のフィールドは、作成時のデータベース内の他のエンティティの状態に基づいて計算されます。

アクターの処理が、データベースがエンティティを永続化できる速度よりも速くなる競合状態を作成することは避けたいと思います。これにより、次のようにデータの一貫性が失われる可能性があります。

  • アクターは を作成し、Fooそれを他のアクターに送信して、さらに処理して保存します
  • アクターは別の を作成するように求められますFoo。最初のものはまだ保存されていないため、DB の古い内容に基づいて新しいものを作成し、間違ったFoo.

Foos の作成は手動でトリガーされるため、この可能性はほとんどありません。ただし、ダブルクリックが高負荷下で問題を引き起こす可能性があることは依然として考えられます。Fooそして、明日が自動的に作成されるかどうかは誰にもわかりません。

したがって、私が必要としているのは、アクターに待機するように指示し、Foos が保存されたことを確認してから操作を再開する方法です。

アクターをアイドル状態にして、しばらくしてから操作を再開するように指示する方法はありますか?

基本的には、メールボックスをメッセージ キューとして使用し、キューの処理速度を制御したいと考えています。

4

2 に答える 2

24

いいえ、アクターを一時停止することはできません。アクターは常にメールボックスからできるだけ早くメッセージをプルします。これにより、着信リクエストが隠蔽され、後で処理される可能性のみが残ります。

class A(db: ActorRef) extends Actor with Stash {
  def receive = {
    case Request =>
      doWork()
      db ! Persist
      context.setReceiveTimeout(5.seconds)
      context.become({
        case Request        => stash()
        case Persisted      => context.unbecome(); unstashAll()
        case ReceiveTimeout => throw new TimeoutException("not persisted")
      }, discardOld = false)
  }
}

メッセージの配信は保証されない (またはデータベースがダウンしている可能性がある) ため、タイムアウトを使用することをお勧めします。

根本的な問題

この問題は、ほとんどの場合、アクター モデルとドメイン モデルの間で十分に調整されていない場合に発生します。アクターは一貫性の単位ですが、ユースケースでは、一貫したイメージには最新の外部エンティティ (データベース)、アクターが正しいことを行うようにします。ユースケースについて詳しく知らずに解決策を推奨することはできませんが、これを考慮して問題を再構築してみてください。

于 2013-01-28T17:05:53.933 に答える
6

これには数行しか必要ないことがわかります。これは私が思いついた解決策であり、pagoda_5b の提案に同意します。

class QueueingActor(nextActor: ActorRef) extends Actor with Stash {
  import QueueingActor._

  def receive = {
    case message =>
      context.become({
        case Resume =>
          unstashAll()
          context.unbecome()
        case _ => stash()
      })
      nextActor ! message
  }
}

object QueueingActor {
  case class Resume()
}
于 2013-01-28T16:25:12.230 に答える