2

チャット アプリケーションがあるとします。

クライアントはチャットにメッセージを送信し、その結果、一部のアクターに何らかのコマンドが送信されました。今、私は彼が書いたものをすぐに処理して、このチャットで他のユーザーが利用できるようにしたいので、このコマンドを処理します。同時に、このメッセージをチャット履歴データベースに保存する必要があることを自分自身 (俳優) に伝えたいのですが、今はそうではありません。データベースへの保存は 2 分ごとに行われます。クラッシュが発生した場合でも、とにかくデータベースに永続化できるはずです。

ワークフローは次のようになると思います:

  1. ユーザーがメッセージを送信
  2. チャット ルーム アクターは、このメッセージでコマンドを受け取りました
  3. このメッセージを全員にブロードキャストし、このメッセージをある種のキューに追加して、チャット履歴データベースに保持します
  4. 一部の持続コマンドは、2 分間のタイムアウトが経過したときに実行されます。まだ永続化されていないすべての受信チャット メッセージを到着順に収集します。
  5. すべてのメッセージでトランザクションを実行し、キューから削除します。
  6. 3 以降のどこかでクラッシュが発生し、メッセージが永続化されなかった場合は、再度永続化を試みる必要があります。それらが永続化された場合、二度とそれらを永続化しようとするべきではありません。

Akka でこのようなものを構築するにはどうすればよいですか? どの機能/パターンを使用する必要がありますか?

4

1 に答える 1

5

2 つのアクターが必要になる場合があります。1 つ (コーディネーター) は、チャット コマンドに関する通知をクライアントに送信します。もう 1 つの (スロットラー) - 2 分ごとにデータベースにデータをプッシュします。キューはスロットラーの内部状態になります。

class Coordinator extends Actor {
   def receive = {
     case command: Record => 
          broadcast(command)
          throttler ! command
   }
}


class Throttler extends Actor {

  import system.dispatcher

  val queue = mutable.List[Record] //it may be a cache instead

  def schedule = system.scheduler.scheduleOnce(2 minutes, self, Tick) // http://doc.akka.io/docs/akka/snapshot/scala/scheduler.html


  def receive = {
       case Start => schedule
       case command: Record =>
           queue ++= command
       case Tick => 
          schedule
          try {
            //---open transaction here---
            for (r <- queue) push(r)
            //---close transaction here---
            queue.clear //will not be cleared in case of exception
          } catch {...}
  }
}

@abatyuk が言ったように、 FSM ベースの実装を使用することもできます。

メールボックスへの負荷を減らす必要がある場合は、Akka Work Pullingなどのバックプレッシャー/負荷分散パターンを試すことができます。

ノード自体を保護する (サーバー ノードの一部に障害が発生した場合にキューの状態を回復する) 場合は、Akka Cluster を使用して (手動で) キューの状態をレプリケートできます。その場合、コーディネーターは Cluster Singleton である必要があり、ティックをランダムなアクター (これにはバスを使用することができます) に送信し、スーパーバイザーとしての成功と失敗を維持する必要があります。スーパーバイザーの状態が失われる可能性があるため、ノードを介して複製する必要があることに注意してください (2 分ごとにノード間で状態をマージします。SortedSetマージは のようなものになるため、キューに使用することをお勧めしますsets.reduce(_ ++ _))。

Riak のようなストレージは、クラスター化の問題を解決する簡単な方法を既に提供しているため、それらをキューとして使用できます (コーディネーターとスロットラーの両方が「ステートレス」シングルトンになります)。Riak の場合、Available+Partitioning (CAP-theorem を参照) として構成できます。これは、データのマージがここでは問題にならないためです。チャット履歴はCRDT(競合のない)データ型です。

もう 1 つの解決策は、スロットラーとして WriteBehind モード (2 分ごとに起動するように構成されている) を使用するキャッシュです。

イベント ソーシングはアクターの状態を保護することもできますが、復元後にすべてのアクションをやり直す必要がある場合に便利です (これは必要ありません。データベースにすべてを再送信します)。スナップショットを使用できます (キャッシュを直接使用するのとほとんど同じです) が、可用性を気にする場合は、ローカル FS ではなく (SnapshotStore を実装して) キャッシュに保存することをお勧めします。ストレージ サイズを減らすために、以前に保存したスナップショットを削除する必要がある場合もあります。また、状態が失われないように、すべてのレコードを同期的に保存する必要があります。

PS メッセージを送信者 (JavaScript に対して) に確認することを忘れないでください。そうしないと、キューとしてキャッシュを使用しても、最後のメッセージ (メールボックス内) が失われる可能性があります。

PS/2 データベースは、速度が遅く、利用できなくなる可能性があるため、ほとんどの場合、アクターの状態の永続化には不適切なソリューションです。また、MongoDB のような強力な一貫性のある NoSQL ソリューションもお勧めしません。この場合、結果整合性が最適な選択です。

于 2014-12-20T12:05:22.767 に答える