次のコードがあります。
class ApiRoutes2[F[_]](implicit F: ConcurrentEffect[F]) extends Http4sDsl[F] {
var queue = Queue.bounded[F, String](100)
def createService(queue: Queue[F, String]): F[Unit] = ???
val service: HttpRoutes[F] = HttpRoutes.of[F] {
case PUT -> Root / "services" =>
val toClientF: F[Stream[F, WebSocketFrame]] = queue.map(_.dequeue.map(t => Text(t)))
val fromClient: Pipe[F, WebSocketFrame, Unit] = _.evalMap {
case Text(t, _) => F.delay(println(t))
case f => F.delay(println(s"Unknown type: $f"))
}
// How to "spawn" createService?
toClientF.flatMap { toClient =>
WebSocketBuilder[F].build(toClient, fromClient)
}
}
}
createService
新しいサービスを作成する機能です。新しいサービスの作成は非常に複雑なプロセスです。CI パイプラインをトリガーし、それらが終了するのを待ってから、同じ方法でさらに CI パイプラインをトリガーする必要があります。受信したキューは、実行中の現在の操作をブラウザに報告するために使用されます。
createService を同時に「生成」し、終了するまで実行させたいと考えています。ただし、同時に WebSocket をクライアントにすぐに返したいと考えています。別名、createService を「生成」している間はブロックできません。
私は立ち往生しています。使用することしか考えられませんshift
が、それは for 内包表記の次の行がブロックされ、終了するのを待ってcreateService
から Websocket をクライアントに返すことを意味します。
私のアプローチは間違っていますか?私は何を間違っていますか?