7

Akka Server HTTP を使用して着信要求を処理するサービス (サービス A と呼びましょう) があります。また、いくつかの Web サービスを提供するサードパーティ アプリケーション (サービス B) もあります。サービス A の目的は、クライアント要求を変換し、サービス B の 1 つまたは複数の Web サービスを呼び出し、結果をマージ/変換してクライアントに返すことです。

一部の部分にはアクターを使用し、他の部分にはフューチャーのみを使用しています。サービス B を呼び出すには、Akka HTTP クライアントを使用します。

Http.get(actorSystem).singleRequest(HttpRequest.create()
        .withUri("http://127.0.0.1:8082/test"), materializer)
        .onComplete(...)

問題は、各サービス A 要求ごとに新しいフローが作成され、複数の同時接続がある場合、次のようになることです。akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] error

私はすでにこの質問をして、単一のフローを使用する提案を得ました複数の (10k - 100k) リクエストに対して Akka HTTP クライアントを適切に呼び出すには?

単一の場所からのリクエストのバッチに対しては機能しますが、すべての同時リクエスト ハンドラーから単一のフローを使用する方法がわかりません。

それを行うための正しい「Akka-way」は何ですか?

4

3 に答える 3

13

Source.queueリクエストをバッファリングするために使用できると思います。以下のコードは、サード パーティのサービスから回答を得る必要があることを前提としていますFuture[HttpResponse]。このようにして、リソースの枯渇を防ぐためのオーバーフロー戦略を提供することもできます。

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}

import scala.concurrent.duration._
import scala.concurrent.{Await, Future, Promise}
import scala.util.{Failure, Success}

import scala.concurrent.ExecutionContext.Implicits.global

implicit val system = ActorSystem("main")
implicit val materializer = ActorMaterializer()
val pool = Http().cachedHostConnectionPool[Promise[HttpResponse]](host = "google.com", port = 80)
val queue = Source.queue[(HttpRequest, Promise[HttpResponse])](10, OverflowStrategy.dropNew)
  .via(pool)
  .toMat(Sink.foreach({
    case ((Success(resp), p)) => p.success(resp)
    case ((Failure(e), p)) => p.failure(e)
  }))(Keep.left)
  .run
val promise = Promise[HttpResponse]
val request = HttpRequest(uri = "/") -> promise

val response = queue.offer(request).flatMap(buffered => {
  if (buffered) promise.future
  else Future.failed(new RuntimeException())
})

Await.ready(response, 3 seconds)

(私のブログ投稿からコピーしたコード)

于 2016-01-31T15:05:31.397 に答える
0

必要なのは、Service A コード内でHostConnectionPoolを Service B にセットアップすることだけです。これFlowにより、サービス A ストリームに追加して、ストリームごとの新しい接続ではなく、接続プールを使用して A から B にリクエストをディスパッチできる が得られます。ドキュメントから:

接続レベルのクライアント側 API とは対照的に、ホストレベルの API を使用すると、個々の HTTP 接続を手動で管理する必要がなくなります。1 つの特定のターゲット エンドポイント (つまり、ホスト/ポートの組み合わせ) への構成可能な接続プールを自律的に管理します。

このフローの各実体化は、異なるストリームで、この基礎となる接続のプールから引き出されます。

特定のターゲット エンドポイントへの接続プールを保持するための最良の方法は、メソッドを使用することです。このメソッドは、アプリケーション レベルのストリーム設定に「焼き付ける」ことができるHttp.get(system).cachedHostConnectionPool(...)を返します。Flowこのフローは、「プール クライアント フロー」とも呼ばれます。

于 2016-01-17T15:31:37.233 に答える