5

akka-streams ベースの Tcp サーバーでの close() の予想される動作は、クライアントがすぐに SocketException を確認し、例外の後に発行されたメッセージがサーバーに配信されないことです。ただし、例外の前に送信された最後のメッセージを配信する必要があります。新しいクライアントはすべて ConnectException に遭遇するはずです。

Tcp サーバーに対して次のように実装 close() を使用しているakka-streams 1.0 ドキュメントの「接続を閉じる」に関する段落を解読できません。

  val serverSource: Promise[ByteString] = Promise[ByteString]()

  var serverBindingOption: Option[Tcp.ServerBinding] = None

  def bind(address: String, port: Int, target: ActorRef, maxInFlight: Int)
    (implicit system: ActorSystem, actorMaterializer: ActorMaterializer): Future[SyslogStreamServer] = {
    implicit val executionContext = system.dispatcher
    val sink = Sink.foreach {
      conn: Tcp.IncomingConnection =>
        val targetSubscriber = ActorSubscriber[Message](system.actorOf(Props(new TargetSubscriber(target, maxInFlight))))

        val targetSink = Flow[ByteString]
          .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = MaxFrameLength, allowTruncation = true))
          .map(raw ⇒ Message(raw))
          .to(Sink(targetSubscriber))

        conn.flow.to(targetSink).runWith(Source(serverSource.future).drop(1)) // drop the last one for no response
    }

    for {
      serverBinding ← Tcp().bind(address, port).to(sink).run()
    } yield {
      serverBindingOption = Some(serverBinding)
      new SyslogStreamServer
    }
  }

  def close()(implicit ec: ExecutionContext): Future[Unit] = {
    serverSource.success(ByteString.empty) // dummy value to be dropped above
    serverBindingOption match {
      case Some(serverBinding) ⇒ serverBinding.unbind()
      case None ⇒ Future[Unit] {}
    }
  }

これはサーバーを閉じる良い方法ですか? (展開用など)

上記の close() の実装の有効性について、誰かがレビューしてコメントできれば幸いです。このコードがすべての要件を満たしているかどうかを証明しようとしていますが、最初にフローのソースを成功させるべきか、最初に serverBinding をバインド解除するべきか疑問に思っています。レビュー/批判は大歓迎です。前もって感謝します !

4

0 に答える 0