cachedHostConnectionPoolHttps を使用してアウトバウンドの https リクエストを作成しようとしていますが、2XX 以外の応答でしばらくすると、プール フローが要素の出力を停止し、フロー全体が停止するようです。この特定の動作が発生する時間は非常にランダムですが、発生し、再現可能です。
これは、しばらくExpected OnNext(_), yet no element signaled during 10 seconds
スローされた後に結果の発行を停止するサンプル テストです。
import java.util.UUID
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.RawHeader
import akka.stream._
import akka.stream.scaladsl.Keep
import akka.stream.testkit.scaladsl.{TestSink, TestSource}
import org.scalatest._
import scala.util.Random
class HttpPoolTest extends FlatSpec with Matchers with OptionValues with Inside with Inspectors {
"HttpPoolTest" should "run without exception" in {
implicit val httpSystem = ActorSystem("http-out")
implicit val httpMat = ActorMaterializer()
implicit val ec = httpSystem.dispatcher
val helloFlow = Http().cachedHostConnectionPoolHttps[Int]("android.googleapis.com",443)(httpMat)
val requestHeaders = scala.collection.immutable.Seq[HttpHeader](RawHeader("Authorization", "key=random" ), RawHeader("Content-Type", "application/json;charset=utf-8"))
val httpRequest = new HttpRequest(HttpMethods.POST, "/gcm/send", requestHeaders,
"""
|{
| "registration_ids": ["rnadomdata"],
| "delay_while_idle": true,
| "data": {
| "message": "Hello World",
| "title": "Interstellar",
| "id": "1231222i3211232228w2t1xx6wxq",
| "notificationType": "Text"
| }
|}
""".
stripMargin)
val (upstream , downstream) = TestSource.probe[(HttpRequest, Int)].via(helloFlow).toMat(TestSink.probe)(Keep.both).run()
while (true) {
val randWait = math.max(5000, Random.nextInt(20000)) //atleast 5 seconds to process the http request
val randRequest = math.max(1, Random.nextInt(5))
downstream.request(randRequest)
(1 to randRequest).foreach(i => upstream.sendNext(httpRequest -> UUID.randomUUID().hashCode()))
println (s"Waiting for $randWait millis to process")
Thread.sleep(randWait)
(1 to randRequest) foreach (i => noException should be thrownBy downstream.expectNext())
}
}
}
ここで何が間違っているのか?しばらくは機能するので、2xx以外のエラーの処理中に何か問題が発生していると思われます。
と をオフにしてみましakka.http.host-connection-pool.max-retries = 0
た akka.http.host-connection-pool.idle-timeout = infinite
が、結果はありません。