2

akka-http websocket サーバーと単純な javascript の間の基本的な接続をセットアップしようとしています。

約 20 の接続のうち 1 つが成功し、残りは失敗します。接続のセットアップがそれほど信頼できない理由がわかりません。

アプリケーション.scala:

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import services.WebService

import scala.concurrent.Await
import scala.concurrent.duration._
import java.util.concurrent.TimeoutException


object Application extends App {

  implicit val system = ActorSystem("api")
  implicit val materializer = ActorMaterializer()

  import system.dispatcher

  val config = system.settings.config
  val interface = config.getString("app.interface")
  val port = config.getInt("app.port")

  val service = new WebService

  val binding = Http().bindAndHandle(service.route, interface, port)

  try {
    Await.result(binding, 1 second)
    println(s"server online at http://$interface:$port/")
  } catch {
    case exc: TimeoutException =>
      println("Server took to long to startup, shutting down")
      system.shutdown()
  }
}

WebService.scala:

import actors.{PublisherActor, SubscriberActor}
import akka.actor.{Props, ActorSystem}
import akka.http.scaladsl.model.ws.{Message, TextMessage}

import akka.http.scaladsl.server.Directives
import akka.stream.Materializer
import akka.stream.scaladsl.{Source, Flow}
import scala.concurrent.duration._


class WebService(implicit fm: Materializer, system: ActorSystem) extends Directives {

  import system.dispatcher
  system.scheduler.schedule(15 second, 15 second) {
    println("Timer message!")
  }

  def route =
    get {
      pathSingleSlash {
        getFromResource("web/index.html")
      } ~
        path("helloworld") {
          handleWebsocketMessages(websocketActorFlow)
        }
    }

  def websocketActorFlow: Flow[Message, Message, Unit] =
    Flow[Message].collect({
      case TextMessage.Strict(msg) =>
        println(msg)
        TextMessage.Strict(msg.reverse)
    })
}

クライアント側:

<input type="text" id="inputMessage"/><br/>
<input type="button" value="Send!" onClick="sendMessage()"/><br/>
<span id="response"></span>
<script type="application/javascript">

    var connection;
    function sendMessage() {
        connection.send(document.getElementById("inputMessage").value);
    }

    document.addEventListener("DOMContentLoaded", function (event) {
        connection = new WebSocket("ws://localhost:8080/helloworld");

        connection.onopen = function (event) {
            connection.send("connection established");
        };

        connection.onmessage = function (event) {
            console.log(event.data);
            document.getElementById("response").innerHTML = event.data;
        }
    });
</script>

サーバーへの接続が失敗した場合、5 秒後に次のようなタイムアウト メッセージが表示されます。

[DEBUG] [07/23/2015 07:59:54.517] [api-akka.actor.default-dispatcher-27] [akka://api/user/$a/flow-76-3-publisherSource-prefixAndTail] Cancelling akka.stream.impl.MultiStreamOutputProcessor$SubstreamOutput@a54778 (after: 5000 ms)

接続が失敗しても成功しても、常に次のログ メッセージが表示されます。

[DEBUG] [07/23/2015 07:59:23.849] [api-akka.actor.default-dispatcher-4] [akka://api/system/IO-TCP/selectors/$a/0] New connection accepted
4

1 に答える 1

0

そのエラー メッセージを注意深く見てください...それは、私が予期していなかったソースから来ています。

これはwebSocketActorFlow、おそらくメッセージを受け取っていて、フローに捕らえられていない可能性があることを示しています。そのため、予期しないサブストリームになってしまいます。

したがって、「一部の時間だけ機能する」のではなく、「ほとんどの時間機能しているが、フローで要求したすべての入力を処理できず、選択できないストリームが残っている可能性があります。最初に死ぬ。

次のいずれかができるかどうかを確認してください。a) ストリームを確実に把握して取り残されないようにする、b) バンドエイドでタイムアウトを調整する、c)そのような発生を検出してダウンストリームの処理をキャンセルする

https://groups.google.com/forum/#!topic/akka-user/x-tARRaJ0LQ

akka { stream { materializer { subscription-timeout { timeout=30s } } } }

http://grokbase.com/t/gg/akka-user/1561gr0jgt/debug-message-cancelling-akka-stream-impl-multistreamoutputprocessor-after-5000-ms

于 2016-04-04T01:39:24.693 に答える