2

Akka (バージョン 2.4.0) Persistency と Cassandra プラグイン (バージョン 0.6、 https://github.com/krasserm/akka-persistence-cassandra ) を使用して障害から回復するアプリを作成しようとしました。イベントは何の問題もなく cassandra に保存されていますが、1 つを強制終了してアクターしようとしたため、スーパーバイザーがそれを再起動しましたが、イベントは によって受信されませんreceiveRecover

問題はプラグイン自体にあるようです。cassandra の代わりに共有 LevelDB を使用しているかのように、回復ステップでイベントが受信されています。

私の永続的なアクターの実装は次のとおりです。



    class SimplePersistentActor extends PersistentActor with ActorLogging {

      def persistenceId: String = context.self.path.name

      override def preRestart(cause: Throwable, msg: Option[Any]) = {
        log.debug(s"Restarting ${getClass.getSimpleName}")
        super.preRestart(cause, msg)
      }

      override def postStop() = {
        log.debug(s"Stopping ${getClass.getSimpleName}")
        super.postStop()
      }

      var transactionData: Either[UninitializedData, RunningTransactionData] = Left(UninitializedData())

      def receiveCommand ={
        case msg @ TransactionStart(transactionId) =>
          persist(msg) { _ => }
          log.debug(s"Starting a transaction with id $transactionId")
          transactionData = Right(RunningTransactionData(transactionId, List()))

          /* Send a reply */
          sender() ! transactionId

        case msg @ TransactionData(data) =>
          persist(msg) { _ => }

          transactionData match {
            case Right(t: RunningTransactionData) =>
              val updatedTransaction = t.copy(data = t.data ::: List(data))
              log.debug(s"There are ${updatedTransaction.data.size} data items within a transaction ${t.transactionId}")
              transactionData = Right(updatedTransaction)

              /* Send a reply */
              sender() ! t.transactionId          

            case _ => log.error("Actor's transaction data is not initialized")
          }

        case TransactionEnd(transactionId) =>
          transactionData match {
            case Right(t: RunningTransactionData) =>
              log.debug(s"Ending a transaction with id ${t.transactionId}")
              transactionData = Left(UninitializedData())

              /* Send a reply */
              sender() ! t.transactionId

            case _ => log.error("Actor's transaction data is not initialized")
          }      

        case other =>
          log.debug(s"Unexpected event received: $other")
      }

      def receiveRecover = {
        case message =>
          log.debug(s"Recovery Step. Message $message received")
      }
    }


上記のどちらの場合でも、コードは変更されません。誰もこの問題を見たことがありますか?

4

1 に答える 1