2

無限のデータ ストリームを消費するプログラムがあります。途中で、単純な合計と平均であるため、モノイドを形成するいくつかのメトリックを記録したいと思います。定期的に、これらのメトリックをどこかに書き出してクリアし、集計に戻りたいと考えています。私は本質的に持っています:

object Foo {
  type MetricsIO[A] = StateT[IO, MetricData, A]

  def recordMetric(m: MetricData): MetricsIO[Unit] = {
    StateT.modify(_.combine(m))
  }

  def sendMetrics: MetricsIO[Unit] = {
    StateT.modifyF { s =>
      val write: IO[Unit] = writeMetrics(s)
      write.attempt.map {
        case Left(_) => s
        case Right(_) => Monoid[MetricData].empty
      }
    }
  }
}

したがって、実行のほとんどはIO直接使用し、使用して持ち上げStateT.liftFます。また、特定の状況では、 への呼び出しをいくつか含めますrecordMetric。その最後に、ストリームがあります:

val mainStream: Stream[MetricsIO, Bar] = ...

そして、定期的に、たとえば毎分程度、メトリックをダンプしたいので、試しました:

val scheduler: Scheduler = ...
val sendStream =
  scheduler
    .awakeEvery[MetricsIO](FiniteDuration(1, TimeUnit.Minutes))
    .evalMap(_ => Foo.sendMetrics)

val result = mainStream.concurrently(sendStream).compile.drain

runそして、開始状態で呼び出すという通常のトップレベルのプログラム処理を行い、次に を呼び出しunsafeRunSyncます。

問題は、空のメトリックしか表示されないことです! 私のモノイドが暗黙的に空のメトリクスを提供していると思われますがsendStream、それがなぜなのか、またはそれを修正する方法がよくわかりません。sendMetrics代わりに、これらの呼び出しをメインストリームに「インターリーブ」できる方法があるでしょうか?

編集:これは最小限の完全な実行可能な例です:

import fs2._
import cats.implicits._
import cats.data._
import cats.effect._
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

val sec = Executors.newScheduledThreadPool(4)
implicit val ec = ExecutionContext.fromExecutorService(sec)

type F[A] = StateT[IO, List[String], A]

val slowInts = Stream.unfoldEval[F, Int, Int](1) { n =>
  StateT(state => IO {
    Thread.sleep(500)
    val message = s"hello $n"
    val newState = message :: state
    val result = Some((n, n + 1))
    (newState, result)
  })
}

val ticks = Scheduler.fromScheduledExecutorService(sec).fixedDelay[F](FiniteDuration(1, SECONDS))

val slowIntsPeriodicallyClearedState = slowInts.either(ticks).evalMap[Int] {
  case Left(n) => StateT.liftF(IO(n))
  case Right(_) => StateT(state => IO {
    println(state)
    (List.empty, -1)
  })
}

今私がする場合:

slowInts.take(10).compile.drain.run(List.empty).unsafeRunSync

次に、期待される結果が得られます-状態が出力に適切に蓄積されます。しかし、もしそうなら:

slowIntsPeriodicallyClearedState.take(10).compile.drain.run(List.empty).unsafeRunSync

その後、一貫して空のリストが出力されます。部分的なリスト (約 2 要素) が出力されると予想していました。

4

1 に答える 1

3

StateT同時アクセスに直面しても安全ではないため、エフェクトタイプで使用するのは安全ではありません。代わりに、Ref(バージョンに応じて、fs2 または cats-effect のいずれかから)を使用することを検討してください。

このようなもの:

def slowInts(ref: Ref[IO, Int]) = Stream.unfoldEval[F, Int, Int](1) { n =>
  val message = s"hello $n"
  ref.modify(message :: _) *> IO {
    Thread.sleep(500)
    val result = Some((n, n + 1))
    result
  }
}

val ticks = Scheduler.fromScheduledExecutorService(sec).fixedDelay[IO](FiniteDuration(1, SECONDS))

def slowIntsPeriodicallyClearedState(ref: Ref[IO, Int] = 
  slowInts.either(ticks).evalMap[Int] {
    case Left(n) => IO.pure(n)
    case Right(_) =>
      ref.modify(_ => Nil).flatMap { case Change(previous, now) => 
        IO(println(now)).as(-1)
      }
  }
于 2018-08-20T13:45:11.217 に答える