5

ActorMaterialzer の監視戦略を使用して、エラー時に akka-stream を再起動できることを知っています。

val decider: Supervision.Decider = {
  case _: ArithmeticException => Supervision.Resume
  case _                      => Supervision.Stop
}
implicit val materializer = ActorMaterializer(
  ActorMaterializerSettings(system).withSupervisionStrategy(decider))
val source = Source(0 to 5).map(100 / _)
val result = source.runWith(Sink.fold(0)(_ + _))
// the element causing division by zero will be dropped
// result here will be a Future completed with Success(228)

ソース: http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-error.html

次のユースケースがあります。

/***
scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-http-experimental"            % "2.4.2",
  "com.typesafe.akka" %% "akka-http-spray-json-experimental" % "2.4.2"
)
*/

import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json._

import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import Uri.Query

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._

import scala.util.{Success, Failure}
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.concurrent.Future

object SO extends DefaultJsonProtocol {

  implicit val system = ActorSystem()
  import system.dispatcher
  implicit val materializer = ActorMaterializer()

  val httpFlow = Http().cachedHostConnectionPoolHttps[HttpRequest]("example.org")

  def search(query: Char) = {
    val request = HttpRequest(uri = Uri("https://example.org").withQuery(Query("q" -> query.toString)))
    (request, request)
  }

  case class Hello(name: String)
  implicit val helloFormat = jsonFormat1(Hello)

  val searches =
    Source('a' to 'z').map(search).via(httpFlow).mapAsync(1){
      case (Success(response), _) => Unmarshal(response).to[Hello]
      case (Failure(e), _) => Future.failed(e)
    }

  def main(): Unit = {
    Await.result(searches.runForeach(_ => println), Duration.Inf)
    ()
  }
}

クエリが非整列化に失敗することがあります。https://example.org/?q=vアルファベット全体を再起動せずに、その単一のクエリで再試行戦略を使用したいと考えています 。

4

2 に答える 2

1

上記のストリーム ソリューションでは、ストリームの最後の要素に対する再試行は実行されません。これは、最後の要素を送信した後にアップストリームが完了すると、マージも完了するためです。その後、唯一の出力は非再試行アウトレットから来ましたが、要素が再試行されるため、それも完了します。

出力を生成するためにすべての入力要素が必要な場合は、アップストリームの完了が process&retry グラフに到達するのを停止する追加のメカニズムが必要になります。1 つの可能性は、プロセスとリトライ グラフからの入力と出力を監視する BidiFlow を使用して、oncomplete を伝播する前に、必要なすべての出力が (観測された入力に対して) 生成されていることを確認することです。単純なケースでは、入力要素と出力要素をカウントするだけです。

于 2016-05-07T02:21:58.287 に答える