2

SignalRef で fs2 ストリームを中断しようとしています。次のようにストリームをセットアップして実行します。ストリームは、含まれているときに実行され、switch含まれているfalseときに中断する必要がありswitchますtrue

  import cats.effect.IO
  import fs2.Stream
  import fs2.concurrent.SignallingRef

  import scala.concurrent.ExecutionContext
  import scala.concurrent.duration.DurationInt

  implicit val contextShift = IO.contextShift(ExecutionContext.global)
  implicit val timer = IO.timer(ExecutionContext.global)

  val switch: IO[SignallingRef[IO, Boolean]] = SignallingRef[IO, Boolean](false)

  val program: Stream[IO, Unit] = {

      val program: Stream[IO, Unit] =
        Stream
          .repeatEval(IO{
            println(java.time.LocalTime.now)
            println(switch.map(_.get).unsafeRunSync.unsafeRunSync)
          })
          .metered(1.second)

      program
        .interruptWhen(Stream.repeatEval(switch.map(_.get).unsafeRunSync))
    }
  
  program.compile.drain.unsafeRunAsync(() => _)

次に、ストリームを中断しようとします

switch.map(_.set(true).unsafeRunSync)

しかし、流れは続きます。標準出力で私が見る

15:58:33.048504
false
15:58:34.048760
false
15:58:35.049063
false
15:58:36.049356
false
15:58:37.049615
false

どうやらスイッチが true に変わっていないのでしょうか?

4

2 に答える 2