2

私たちは上にいます: akka-stream-experimental_2.11 1.0.

に触発された

次のように TCP レシーバーを作成しました。

def bind(address: String, port: Int, target: ActorRef)
          (implicit system: ActorSystem, actorMaterializer: ActorMaterializer): Future[ServerBinding] = {
    val sink = Sink.foreach[Tcp.IncomingConnection] { conn =>
      val serverFlow = Flow[ByteString]
        .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
        .map(message => {
        target ? new Message(message); ByteString.empty
      })
      conn handleWith serverFlow
    }

    val connections = Tcp().bind(address, port)
    connections.to(sink).run()
  }

ただし、私たちの意図は、受信者がまったく応答せず、メッセージをシンクするだけにすることでした。(TCP メッセージ パブリッシャーは response を気にしません)。

それは可能ですか?akka.stream.scaladsl.Tcp.IncomingConnection はタイプ Flow[ByteString, ByteString, Unit] のフローを取るため、まったく応答しません。

はいの場合、いくつかのガイダンスをいただければ幸いです。前もって感謝します。

次の 1 つの試みは私の単体テストに合格しますが、それが最善のアイデアかどうかはわかりません。

def bind(address: String, port: Int, target: ActorRef)
          (implicit system: ActorSystem, actorMaterializer: ActorMaterializer): Future[ServerBinding] = {
    val sink = Sink.foreach[Tcp.IncomingConnection] { conn =>

      val targetSubscriber = ActorSubscriber[Message](system.actorOf(Props(new TargetSubscriber(target))))

      val targetSink = Flow[ByteString]
        .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
        .map(Message(_))
        .to(Sink(targetSubscriber))

      conn.flow.to(targetSink).runWith(Source(Promise().future))
    }

    val connections = Tcp().bind(address, port)
    connections.to(sink).run()
  }
4

1 に答える 1