4

次のコードがあります。

class ApiRoutes2[F[_]](implicit F: ConcurrentEffect[F]) extends Http4sDsl[F] {
  var queue = Queue.bounded[F, String](100)

  def createService(queue: Queue[F, String]): F[Unit] = ???

  val service: HttpRoutes[F] = HttpRoutes.of[F] {
    case PUT -> Root / "services" =>
      val toClientF: F[Stream[F, WebSocketFrame]] = queue.map(_.dequeue.map(t => Text(t)))
      val fromClient: Pipe[F, WebSocketFrame, Unit] = _.evalMap {
        case Text(t, _) => F.delay(println(t))
        case f => F.delay(println(s"Unknown type: $f"))
      }

      // How to "spawn" createService?

      toClientF.flatMap { toClient =>
        WebSocketBuilder[F].build(toClient, fromClient)
      }
  }
}

createService新しいサービスを作成する機能です。新しいサービスの作成は非常に複雑なプロセスです。CI パイプラインをトリガーし、それらが終了するのを待ってから、同じ方法でさらに CI パイプラインをトリガーする必要があります。受信したキューは、実行中の現在の操作をブラウザに報告するために使用されます。

createService を同時に「生成」し、終了するまで実行させたいと考えています。ただし、同時に WebSocket をクライアントにすぐに返したいと考えています。別名、createService を「生成」している間はブロックできません。

私は立ち往生しています。使用することしか考えられませんshiftが、それは for 内包表記の次の行がブロックされ、終了するのを待ってcreateServiceから Websocket をクライアントに返すことを意味します。

私のアプローチは間違っていますか?私は何を間違っていますか?

4

1 に答える 1