0

ファイルを Dropbox にアップロードするための親 -> 子アクターの関係があります。リレーションは、スーパーバイザー アクターとアップロード アクターで構成されます。スーパーバイザー アクターは、アップロード アクターのスーパーバイザー戦略を定義します。そのため、Dropbox へのアップロードが失敗した場合、再試行の最大回数に達している限り、アクターを再起動する必要があります。私のアプリケーションでは、アップロードのステータスに興味があります。そのため、ask パターンを使用して、成功または失敗の場合に未来を受け取ります。以下に、私のアクターの現在の実装を示します。

/**
 * An upload message.
 *
 * @param byte The byte array representing the content of a file.
 * @param path The path under which the file should be stored.
 */
case class UploadMsg(byte: Array[Byte], path: String)

/**
 * The upload supervisor.
 */
class UploadSupervisor extends Actor {

  /**
   * Stores the sender to the executing actor.
   */
  val senders: ParHashMap[String, ActorRef] = ParHashMap()

  /**
   * Defines the supervisor strategy
   */
  override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
    case _: DbxException => Restart
    case e: Exception => Stop
  }

  /**
   * Handles the received messages.
   */
  override def receive: Actor.Receive = {
    case msg: UploadMsg =>
      implicit val timeout = Timeout(60.seconds)

      val actor = context.actorOf(PropsContext.get(classOf[UploadActor]))
      senders += actor.path.toString -> sender
      context.watch(actor)
      ask(actor, msg).mapTo[Unit] pipeTo sender

    case Terminated(a) =>
      context.unwatch(a)
      senders.get(a.path.toString).map { sender =>
        sender ! akka.actor.Status.Failure(new Exception("Actor terminated"))
        senders - a.path.toString
      }
  }
}

/**
 * An aktor which uploads a file to Dropbox.
 */
class UploadActor @Inject() (client: DropboxClient) extends Actor {

  /**
   * Sends the message again after restart.
   *
   * @param reason The reason why an restart occurred.
   * @param message The message which causes the restart.
   */
  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    super.preRestart(reason, message)
    message foreach { self forward }
  }

  /**
   * Handles the received messages.
   */
  override def receive: Receive = {
    case UploadMsg(byte, path) =>
      val encrypted = encryptor.encrypt(byte)
      val is = new ByteArrayInputStream(encrypted)
      try {
        client.storeFile("/" + path, DbxWriteMode.force(), encrypted.length, is)
        sender ! (())
      } finally {
        is.close()
      }
  }
}

私の質問は次のとおりです。指定された回数または再試行の後にアップロードアクターが失敗したことをアプリケーションに伝えるためのより良い解決策はありますか? 特に、アクターの送信者を格納するマップは、少しぎこちなく感じます。

4

1 に答える 1

1

サーキットブレーカーを使用する必要があります

val breaker =
 new CircuitBreaker(context.system.scheduler,
  maxFailures = 5,
  callTimeout = 10.seconds,
  resetTimeout = 1.minute)

次に、メッセージをブレーカーでラップします。

sender() ! breaker.withSyncCircuitBreaker(dangerousCall)

ブレーカーには、クローズド、オープン、ハーフオープンの 3 つの状態があります。通常の状態は Closed ですが、メッセージが $maxFailures 回失敗すると、状態が Open に変更されます。ブレーカーは、状態変化のコールバックを提供します。何かをしたい場合は、それを使用してください。例えば:

breaker onOpen { sender ! FailureMessage()}
于 2015-02-12T10:03:52.660 に答える