0

次の問題を解決するために、akka-streams と akka-http を使用しようとしています。

  • 2 つの http クライアント (A および B) が 1 つの http サーバー (C) にリクエストを送信します。どちらの側も akka-http を使用して通信します。
  • A からの要求は B に比べて優先度が高くなりますが、両方の要求が同等に処理される必要があります。したがって、C は A からの要求を最初に処理し、B からの要求は 2 番目に優先されます。
  • もちろん、両端でバックプレッシャーを有効にしたいと考えています

着信接続を 1 つの出力にマージするために、次のコードを考え出しました。

val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit b: FlowGraph.Builder[Unit] =>
    import FlowGraph.Implicits._

    val merge = b.add(MergePreferred[IncomingConnection](1))

    val inA: Source[IncomingConnection, Future[ServerBinding]] = Http().bind(interface = "localhost", port = 8200)
    val inB: Source[IncomingConnection, Future[ServerBinding]] = Http().bind(interface = "localhost", port = 8201)

    inA ~> merge.preferred
    inB ~> merge.in(0)
           merge.out ~> Sink.foreach(println)

    ClosedShape
}).run()

したがって、A と B からの IncomingConnection インスタンスを持つソースがあります。

今、私はそれらを何らかの方法で処理し、応答を生成し、対応する接続​​に応答を送信したいと考えています。

これらすべてをアーカイブするためのより良い方法があるかもしれませんが、ドキュメントや他の人からの質問でそのような問題を解決する例を見つけることができませんでした.

また、問題はかなり一般的だと思います。

よろしくお願いします。

4

1 に答える 1

1

IncomingConnectionリクエストを処理してレスポンスを返すためにオブジェクトを使用する方法は、メソッドを使用するhandleXXXことです。ドキュメントapiに基づく例:

同期機能

//this processor contains your logic for converting a request into a response
def requestProcessor(httpRequest : HttpRequest) : HttpResponse = ???

val connectionSink = Sink.foreach[IncomingConnection] { conn =>
  conn.handleWithSyncHandler(requestProcessor)
}

これconnectionSinkは、グラフ構造で使用できます。

inA ~> merge.preferred
inB ~> merge.in(0)
       merge.out ~> connectionSink

非同期関数

同様に、プロセッサが非同期の場合:

val asyncReqProcessor(httpRequest : HttpRequest) : Future[HttpResponse] = ???

val connectionSink = 
  Sink.foreach[IncomingConnection](_ handleWithAsyncHandler asyncProcessor)

フロー

そして最後に、フローを使用することもできます (私の好みの方法):

val flowReqProcessor : Flow[HttpRequest, HttpResponse,_] = ???

val connectionSink = 
  Sink.foreach[IncomingConnection](_ handleWith flowReqProcessor )

フローに関する私のお気に入りの機能の 1 つは、さまざまなストリームでフローを何度も再利用できることです。したがって、flowReqProcessor は のval代わりに にすることができますdef

于 2015-12-16T12:47:33.043 に答える