9

私のシナリオでは、クライアントが "さようなら" websocket メッセージを送信し、サーバー側で以前に確立された接続を閉じる必要があります。

akka-http docsから:

接続を閉じるには、サーバー ロジックから着信接続フローをキャンセルします (たとえば、その下流を Sink.cancelled に接続し、その上流を Source.empty に接続するなど)。IncomingConnection ソース接続をキャンセルして、サーバーのソケットをシャットダウンすることもできます。

しかし、それを考慮してそれを行う方法は明確ではなくSinkSource新しい接続をネゴシエートするときに一度設定されます:

(get & path("ws")) {
  optionalHeaderValueByType[UpgradeToWebsocket]() {
    case Some(upgrade) ⇒
      val connectionId = UUID()
      complete(upgrade.handleMessagesWithSinkSource(sink, source))
    case None ⇒
      reject(ExpectedWebsocketRequestRejection)
  }
}
4

3 に答える 3

5

akka-stream-experimentalヒント: この回答はバージョンに基づいてい2.0-M2ます。他のバージョンでは、API が若干異なる場合があります。


接続を閉じる簡単な方法は、次を使用することPushStageです。

import akka.stream.stage._

val closeClient = new PushStage[String, String] {
  override def onPush(elem: String, ctx: Context[String]) = elem match {
    case "goodbye" ⇒
      // println("Connection closed")
      ctx.finish()
    case msg ⇒
      ctx.push(msg)
  }
}

クライアント側またはサーバー側で受信されるすべての要素 (および一般に を通過するすべての要素Flow) は、そのようなStageコンポーネントを通過します。Akka では、完全な抽象化は と呼ばれます。GraphStage詳細については、公式ドキュメントを参照してください。

を使用すると、PushStage具体的な着信要素の値を監視し、それに応じてコンテキストを変換できます。上記の例では、goodbyeメッセージが受信されるとコンテキストを終了します。それ以外の場合は、pushメソッドを介して値を転送します。

これで、メソッドcloseClientを介してコンポーネントを任意のフローに接続できます。transform

val connection = Tcp().outgoingConnection(address, port)

val flow = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .transform(() ⇒ closeClient)
  .map(_ ⇒ StdIn.readLine("> "))
  .map(_ + "\n")
  .map(ByteString(_))

connection.join(flow).run()

上記のフローは を受け取り、ByteStringを返します。これは、メソッドを介してByteString接続できることを意味します。フロー内では、バイトを に送信する前に、まずバイトを文字列に変換します。がストリームを終了しない場合、要素はストリームに転送され、そこでドロップされて stdin からの入力に置き換えられ、ネットワーク経由で送り返されます。ストリームが終了すると、ステージ コンポーネントの後のすべてのストリーム処理ステップが削除されます。ストリームは閉じられます。connectionjoincloseClientPushStage

于 2015-12-18T22:50:13.043 に答える
4

これは、現在の (2.4.14) バージョンの akka-stream では次のようにして実現できます。

package com.trackabus.misc

import akka.stream.stage._
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}

// terminates the flow based on a predicate for a message of type T
// if forwardTerminatingMessage is set the message is passed along the flow
// before termination
// if terminate is true the stage is failed, if it is false the stage is completed
class TerminateFlowStage[T](
    pred: T => Boolean, 
    forwardTerminatingMessage: Boolean = false, 
    terminate: Boolean = true)
  extends GraphStage[FlowShape[T, T]]
{
  val in = Inlet[T]("TerminateFlowStage.in")
  val out = Outlet[T]("TerminateFlowStage.out")
  override val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) {

      setHandlers(in, out, new InHandler with OutHandler {
        override def onPull(): Unit = { pull(in) }

        override def onPush(): Unit = {
          val chunk = grab(in)

          if (pred(chunk)) {
            if (forwardTerminatingMessage)
              push(out, chunk)
            if (terminate)
              failStage(new RuntimeException("Flow terminated by TerminateFlowStage"))
            else
              completeStage()
          }
          else
            push(out, chunk)
        }
      })
  }
}

それを使用してステージを定義します

val termOnKillMe = new TerminateFlowStage[Message](_.isInstanceOf[KillMe])

そしてそれをフローの一部として含めます

.via(termOnKillMe)
于 2016-12-04T19:43:19.877 に答える