akka-stream-experimental
ヒント: この回答はバージョンに基づいてい2.0-M2
ます。他のバージョンでは、API が若干異なる場合があります。
接続を閉じる簡単な方法は、次を使用することPushStage
です。
import akka.stream.stage._
val closeClient = new PushStage[String, String] {
override def onPush(elem: String, ctx: Context[String]) = elem match {
case "goodbye" ⇒
// println("Connection closed")
ctx.finish()
case msg ⇒
ctx.push(msg)
}
}
クライアント側またはサーバー側で受信されるすべての要素 (および一般に を通過するすべての要素Flow
) は、そのようなStage
コンポーネントを通過します。Akka では、完全な抽象化は と呼ばれます。GraphStage
詳細については、公式ドキュメントを参照してください。
を使用すると、PushStage
具体的な着信要素の値を監視し、それに応じてコンテキストを変換できます。上記の例では、goodbye
メッセージが受信されるとコンテキストを終了します。それ以外の場合は、push
メソッドを介して値を転送します。
これで、メソッドcloseClient
を介してコンポーネントを任意のフローに接続できます。transform
val connection = Tcp().outgoingConnection(address, port)
val flow = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.transform(() ⇒ closeClient)
.map(_ ⇒ StdIn.readLine("> "))
.map(_ + "\n")
.map(ByteString(_))
connection.join(flow).run()
上記のフローは を受け取り、ByteString
を返します。これは、メソッドを介してByteString
接続できることを意味します。フロー内では、バイトを に送信する前に、まずバイトを文字列に変換します。がストリームを終了しない場合、要素はストリームに転送され、そこでドロップされて stdin からの入力に置き換えられ、ネットワーク経由で送り返されます。ストリームが終了すると、ステージ コンポーネントの後のすべてのストリーム処理ステップが削除されます。ストリームは閉じられます。connection
join
closeClient
PushStage