3

測定値の fs2.Stream を返す関数があります。

import cats.effect._
import fs2._

def apply(sds: SerialPort, interval: Int)(implicit cs: ContextShift[IO]): Stream[IO, SdsMeasurement] =
  for {
     blocker <- Stream.resource(Blocker[IO])
     stream <- io.readInputStream(IO(sds.getInputStream), 1, blocker)
      .through(SdsStateMachine.collectMeasurements())
  } yield stream

テストフラグを渡さない限り、通常は無限のストリームです。テストフラグを渡した場合は、1 つの値を出力して停止する必要があります。

val infiniteSource: Stream[IO, SdsMeasurement] = ...
val source = if (isTest) infiniteSource.take(1) else infiniteSource
source.compile.drain

無限ストリームは正常に機能します。それは私にすべての測定値を無限に与えます。実際、テスト ストリームは最初の測定値のみを提供し、それ以上は提供しません。私が抱えている問題は、この最後の測定後にストリームが返されないことです。永久にブロックします。私は何を間違っていますか?

注: 本質的なコードを抽象化したと思いますが、詳細については、私のプロジェクトをご覧ください: https://github.com/jkransen/fijnstof/blob/ZIO/src/main/scala/nl/kransen/fijnstof /Main.scala

4

1 に答える 1