2

scalaz-stream を使用して、コストのかかる操作でデータのストリームを処理しようとしています※。

scala> :paste
// Entering paste mode (ctrl-D to finish)

    def expensive[T](x:T): T = {
      println(s"EXPENSIVE! $x")
      x
    }
    ^D
// Exiting paste mode, now interpreting.

expensive: [T](x: T)T

※はい、はい、副作用のあるコードを混在させることは、関数型プログラミング スタイルとしてよくないことだとわかっています。print ステートメントは、高価な() が呼び出された回数を追跡するためのものです。)

コストのかかる操作にデータを渡す前に、データをチャンクに分割する必要があります。

scala> val chunked: Process[Task,Vector[Int]] = Process.range(0,4).chunk(2)
chunked: scalaz.stream.Process[scalaz.concurrent.Task,Vector[Int]] = Await(scalaz.concurrent.Task@7ef516f3,<function1>,Emit(SeqView(...),Halt(scalaz.stream.Process$End$)),Emit(SeqView(...),Halt(scalaz.stream.Process$End$)))

scala> chunked.runLog.run
res1: scala.collection.immutable.IndexedSeq[Vector[Int]] = Vector(Vector(0, 1), Vector(2, 3), Vector())

次に、コストのかかる操作をチャンクのストリームにマップします。

scala> val processed = chunked.map(expensive)
processed: scalaz.stream.Process[scalaz.concurrent.Task,Vector[Int]] = Await(scalaz.concurrent.Task@7ef516f3,<function1>,Emit(SeqViewM(...),Halt(scalaz.stream.Process$End$)),Emit(SeqViewM(...),Halt(scalaz.stream.Process$End$)))

これを実行すると、expected() が予想される回数だけ呼び出されます。

scala> processed.runLog.run
EXPENSIVE! Vector(0, 1)
EXPENSIVE! Vector(2, 3)
EXPENSIVE! Vector()
res2: scala.collection.immutable.IndexedSeq[Vector[Int]] = Vector(Vector(0, 1), Vector(2, 3), Vector())

ただし、zipWithIndex への呼び出しをチェーンすると、expensive() が何度も呼び出されます。

>scala processed.zipWithIndex.runLog.run
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector(0)
EXPENSIVE! Vector(0)
EXPENSIVE! Vector(0)
EXPENSIVE! Vector(0)
EXPENSIVE! Vector(0, 1)
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector(2)
EXPENSIVE! Vector(2)
EXPENSIVE! Vector(2)
EXPENSIVE! Vector(2)
EXPENSIVE! Vector(2, 3)
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
res3: scala.collection.immutable.IndexedSeq[(Vector[Int], Int)] = Vector((Vector(0, 1),0), (Vector(2, 3),1), (Vector(),2))

これはバグですか?それが望ましい動作である場合、誰かが理由を説明できますか? cost() に長い時間がかかる場合、呼び出し回数が少ない結果を好む理由がわかります。

より多くの例を含む要点は次のとおりです: https://gist.github.com/underspecified/11279251

4

1 に答える 1

2

この問題は、さまざまな形で発生する可能性があります。問題は本質的に、結果を構築している間にmap実行されている中間ステップを確認 (および実行) できることです。chunk

この動作は将来変更される可能性がありますが、それまでの間、いくつかの回避策が考えられます。最も簡単な方法の 1 つは、高価な関数をプロセスでラップし、flatMap代わりに使用することmapです。

chunked.flatMap(a =>
  Process.eval(Task.delay(expensive(a)))
).zipWithIndex.runLog.run

別の解決策は、高価な関数を効果的なチャネルにラップすることです。

def expensiveChannel[A] = Process.constant((a: A) => Task.delay(expensive(a)))

今、あなたは使用することができますthrough:

chunked.through(expensiveChannel).zipWithIndex.runLog.run

現在の動作は少し驚くかもしれませんが、関心のあるすべての効果を追跡するために型システムを使用する必要があることを思い出してください (長時間実行される計算もその 1 つです)。

于 2014-04-25T11:40:04.583 に答える