最近、akka ストリームを使用していくつかの小さな Web 処理サービスを構築し始めました。とても簡単です。redis から URL を取得し、それらの URL (画像) をダウンロードします。後で画像を処理し、それらを s3 にプッシュし、json を redis にプッシュします。
複数のサイトからさまざまな種類の画像をダウンロードしています。404、Unexpected disconnect 、Response Content-Length 17951202 が構成された制限の 8388608 を超えています、EntityStreamException: Entity stream truncation and redirects などのエラーが大量に発生します。リダイレクトを使用して、応答のロケーションヘッダーにあるアドレスで requestWithRedirects を呼び出しています。
ダウンロードを担当する部分は、次のようになります。
override lazy val http: HttpExt = Http()
def requestWithRedirects(request: HttpRequest, retries: Int = 10)(implicit akkaSystem: ActorSystem, materializer: FlowMaterializer): Future[HttpResponse] = {
TimeoutFuture(timeout, msg = "Download timed out!") {
http.singleRequest(request)
}.flatMap {
response => handleResponse(request, response, retries)
}.recoverWith {
case e: Exception if retries > 0 =>
requestWithRedirects(request, retries = retries - 1)
}
}
TimeoutFuture は非常に単純で、future と timeout が必要です。future がタイムアウトよりも長くかかる場合は、timeout 例外で他の future を返します。私が抱えている問題は、しばらくするとエラーが発生することです:
Message: RuntimeException: Exceeded configured max-open-requests value of [128] akka.http.impl.engine.client.PoolInterfaceActor$$anonfun$receive$1.applyOrElse in PoolInterfaceActor.scala::109
akka.actor.Actor$class.aroundReceive in Actor.scala::467
akka.http.impl.engine.client.PoolInterfaceActor.akka$stream$actor$ActorSubscriber$$super$aroundReceive in PoolInterfaceActor.scala::46
akka.stream.actor.ActorSubscriber$class.aroundReceive in ActorSubscriber.scala::208
akka.http.impl.engine.client.PoolInterfaceActor.akka$stream$actor$ActorPublisher$$super$aroundReceive in PoolInterfaceActor.scala::46
akka.stream.actor.ActorPublisher$class.aroundReceive in ActorPublisher.scala::317
akka.http.impl.engine.client.PoolInterfaceActor.aroundReceive in PoolInterfaceActor.scala::46
akka.actor.ActorCell.receiveMessage in ActorCell.scala::516
akka.actor.ActorCell.invoke in ActorCell.scala::487
akka.dispatch.Mailbox.processMailbox in Mailbox.scala::238
akka.dispatch.Mailbox.run in Mailbox.scala::220
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec in AbstractDispatcher.scala::397
scala.concurrent.forkjoin.ForkJoinTask.doExec in ForkJoinTask.java::260
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask in ForkJoinPool.java::1339
scala.concurrent.forkjoin.ForkJoinPool.runWorker in ForkJoinPool.java::1979
scala.concurrent.forkjoin.ForkJoinWorkerThread.run in ForkJoinWorkerThread.java::107
何が問題なのかはわかりませんが、正しく終了していないダウンロードがいくつかあり、しばらくすると上記のエラーが発生して接続のグローバルプールにとどまると思います。問題の原因となっている可能性のあるアイデアはありますか? または、問題の原因を見つける方法: 既に 404 応答をテストしましたが、Response Content-Length が ... エラーを超えていますが、トラブルメーカーではないようです。
編集:おそらく問題は私のTimeoutFutureにあります。ここhttps://stackoverflow.com/a/29330010/2963977で説明されているようにエラーで埋めていますが、私の意見では、実際にイメージをダウンロードすることは決して完了せず、接続プールのリソースを消費しています。
私の場合、これらの設定が影響を与えないのはなぜだろうか:
akka.http.client.connecting-timeout = 1 s
akka.http.client.idle-timeout = 1 s
akka.http.host-connection-pool.idle-timeout = 1 s
EDIT2:
どうやらタイムアウトはまだサポートされていません。これが私のバグレポートです https://github.com/akka/akka/issues/17732#issuecomment-112315953