14

私は akka ストリームを使用しており、フローが特定の値を処理できないため、条件付きでスキップする必要があるグラフのセグメントがあります。具体的には、文字列を受け取って http リクエストを行うフローがありますが、文字列が空の場合、サーバーはケースを処理できません。しかし、代わりに空の文字列を返す必要があります。失敗することを知っているhttpリクエストを実行する必要なく、これを行う方法はありますか? 私は基本的にこれを持っています:

val source = Source("1", "2", "", "3", "4")
val httpRequest: Flow[String, HttpRequest, _]
val httpResponse: Flow[HttpResponse, String, _]
val flow = source.via(httpRequest).via(httpResponse)

私が考えられる唯一のことは、httpResponse フローで 400 エラーをキャッチし、デフォルト値を返すことです。しかし、事前に失敗することがわかっているリクエストに対してサーバーにアクセスするオーバーヘッドを回避できるようにしたいと考えています。

4

2 に答える 2

16

使用できますflatMapConcat

(警告: コンパイルされたことはありませんが、要点はわかります)

val source = Source("1", "2", "", "3", "4")
val httpRequest: Flow[String, HttpRequest, _]
val httpResponse: Flow[HttpResponse, String, _]
val makeHttpCall: Flow[HttpRequest, HttpResponse, _]
val someHttpTransformation = httpRequest via makeHttpCall via httpResponse
val emptyStringSource = Source.single("")
val cleanerSource = source.flatMapConcat({
  case "" => emptyStringSource
  case other => Source.single(other) via someHttpTransformation
})
于 2015-11-20T12:03:23.757 に答える
14

Viktor Klang のソリューションは簡潔でエレガントです。グラフを使用して別の方法を示したかっただけです。

文字列のソースを 2 つのストリームに分割し、一方のストリームで有効な文字列をフィルタリングし、もう一方のストリームで無効な文字列をフィルタリングできます。次に、結果をマージします ("ストリームを横切る")。

ドキュメントに基づく:

val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder: FlowGraph.Builder[Unit] =>
  import FlowGraph.Implicits._

  val source = Source(List("1", "2", "", "3", "4"))
  val sink : Sink[String,_] = ???

  val bcast = builder.add(Broadcast[String](2))
  val merge = builder.add(Merge[String](2))

  val validReq =   Flow[String].filter(_.size > 0)
  val invalidReq = Flow[String].filter(_.size == 0)

  val httpRequest: Flow[String, HttpRequest, _] = ???
  val makeHttpCall: Flow[HttpRequest, HttpResponse, _] = ???
  val httpResponse: Flow[HttpResponse, String, _] = ???
  val someHttpTransformation = httpRequest via makeHttpCall via httpResponse

  source ~> bcast ~> validReq ~> someHttpTransformation ~> merge ~> sink
            bcast ~>      invalidReq                    ~> merge
  ClosedShape
})

注: このソリューションはストリームを分割するため、シンクは文字列値の結果を入力に基づく予想とは異なる順序で処理する場合があります。

于 2015-11-20T13:07:55.973 に答える