私たちは上にいます: akka-stream-experimental_2.11 1.0.
例に触発された
次のように TCP レシーバーを作成しました。
def bind(address: String, port: Int, target: ActorRef)
(implicit system: ActorSystem, actorMaterializer: ActorMaterializer): Future[ServerBinding] = {
val sink = Sink.foreach[Tcp.IncomingConnection] { conn =>
val serverFlow = Flow[ByteString]
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.map(message => {
target ? new Message(message); ByteString.empty
})
conn handleWith serverFlow
}
val connections = Tcp().bind(address, port)
connections.to(sink).run()
}
ただし、私たちの意図は、受信者がまったく応答せず、メッセージをシンクするだけにすることでした。(TCP メッセージ パブリッシャーは response を気にしません)。
それは可能ですか?akka.stream.scaladsl.Tcp.IncomingConnection はタイプ Flow[ByteString, ByteString, Unit] のフローを取るため、まったく応答しません。
はいの場合、いくつかのガイダンスをいただければ幸いです。前もって感謝します。
次の 1 つの試みは私の単体テストに合格しますが、それが最善のアイデアかどうかはわかりません。
def bind(address: String, port: Int, target: ActorRef)
(implicit system: ActorSystem, actorMaterializer: ActorMaterializer): Future[ServerBinding] = {
val sink = Sink.foreach[Tcp.IncomingConnection] { conn =>
val targetSubscriber = ActorSubscriber[Message](system.actorOf(Props(new TargetSubscriber(target))))
val targetSink = Flow[ByteString]
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.map(Message(_))
.to(Sink(targetSubscriber))
conn.flow.to(targetSink).runWith(Source(Promise().future))
}
val connections = Tcp().bind(address, port)
connections.to(sink).run()
}