5

空とのマージがfs2.Stream同じを生成する必要があることは十分に文書化されていfs2.Streamます。Scaladocsからの引用は次のとおりです。

という性質を持っていますmerge(Stream.empty, s) == s

を使用した次の完全な Scala プログラムを考えてみましょうfs2.Stream

放出要素

import scala.concurrent.duration._
import cats.effect.{ContextShift, IO, Timer}
import cats.syntax.flatMap._
import cats.effect.concurrent.Ref

import scala.concurrent.ExecutionContext

object TestFs2 extends App {
  implicit val timerIo: Timer[IO] = IO.timer(ExecutionContext.global)
  implicit val concurrentIo: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

  val program = Ref.of[IO, Int](0).map(ref => {
    fs2.Stream.repeatEval(ref.get).evalMap(value => {
      IO(println(s"Got value $value")) >> IO.sleep(1.second) >> ref.set(value + 1)
    })
  })

  program.flatMap(_.compile.drain).unsafeRunSync()
}

プログラムは次のように出力します。

Got value 0
Got value 1
Got value 2
...

そしてそれは大丈夫に見えます。Scaladoc上記の引用を適用すると、

fs2.Stream.repeatEval(ref.get)

fs2.Stream.repeatEval(ref.get).merge(fs2.Stream.empty.covaryAll[IO, Int])

動作は同じである必要があります。更新されたプログラムは次のとおりです。

要素の発行と空の fs2.Stream とのマージ

import scala.concurrent.duration._
import cats.effect.{ContextShift, IO, Timer}
import cats.syntax.flatMap._
import cats.effect.concurrent.Ref

import scala.concurrent.ExecutionContext

object TestFs2 extends App {
  implicit val timerIo: Timer[IO] = IO.timer(ExecutionContext.global)
  implicit val concurrentIo: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

  val program = Ref.of[IO, Int](0).map(ref => {
    fs2.Stream.repeatEval(ref.get).merge(fs2.Stream.empty.covaryAll[IO, Int]).evalMap(value => {
      IO(println(s"Got value $value")) >> IO.sleep(1.second) >> ref.set(value + 1)
    })
  })

  program.flatMap(_.compile.drain).unsafeRunSync()
}

プログラムの出力は

Got value 0
Got value 0
Got value 1
Got value 1
Got value 2
Got value 2
Got value 3
Got value 3
...

質問:fs2.Streamの変更をマージすると、プログラムの動作が変更され、元の要素が複製されるのはなぜfs2.Streamですか?

4

2 に答える 2