11

一定のスタックとヒープ空間で State モナドで折り畳みを実行することは可能ですか? または、別の機能的手法が私の問題により適していますか?

次のセクションでは、問題と動機付けとなるユース ケースについて説明します。私は Scala を使用していますが、Haskell でのソリューションも歓迎します。


Stateモナドのフォールドがヒープを埋める

Scalaz 7 を仮定します。 State モナドのモナド折り畳みを考えます。スタック オーバーフローを避けるために、フォールドをトランポリンします。

import scalaz._
import Scalaz._
import scalaz.std.iterable._
import Free.Trampoline

type TrampolinedState[S, B] = StateT[Trampoline, S, B] // monad type constructor

type S = Int  // state is an integer
type M[B] = TrampolinedState[S, B] // our trampolined state monad

type R = Int  // or some other monoid

val col: Iterable[R] = largeIterableofRs() // defined elsewhere

val (count, sum): (S, R) = col.foldLeftM[M, R](Monoid[R].zero){ 
    (acc: R, x: R) => StateT[Trampoline, S, R] {
      s: S => Trampoline.done { 
        (s + 1, Monoid[R].append(acc, x))
      }
    }
} run 0 run

// In Scalaz 7, foldLeftM is implemented in terms of foldRight, which in turn
// is a reversed.foldLeft. This pulls the whole collection into memory and kills
// the heap.  Ignore this heap overflow. We could reimplement foldLeftM to avoid
// this overflow or use a foldRightM instead.
// Our real issue is the heap used by the unexecuted State mobits.

大規模なコレクションのcol場合、これでヒープがいっぱいになります。

フォールド中に、コレクション (パラメーター) 内の値ごとにクロージャー (State mobit) が作成されx: R、ヒープがいっぱいになると思います。が実行されるまで、これらはどれも評価できずrun 0、初期状態が提供されます。

この O(n) ヒープの使用を回避できますか?

より具体的には、後で評価するためにクロージャをネストするのではなく、State モナドが各バインド中に実行できるように、フォールドの前に初期状態を提供できますか?

それとも、State モナドの後に遅延実行されるように折り畳みを構築できrunますか? このようにして、次のx: Rクロージャーは、前のクロージャーが評価されてガベージ コレクションに適したものになるまで作成されません。

それとも、この種の作業のためのより優れた機能パラダイムはありますか?


適用例

しかし、おそらく私は仕事に間違ったツールを使用しています。ユースケースの例の進化は次のとおりです。私はここで間違った道をさまよっていますか?

リザーバー サンプリング、つまり、k大きすぎてメモリに収まらないコレクションから均一なランダム アイテムを1 回のパスで選択することを検討してください。Scala では、そのような関数は次のようになります。

def sample[A](col: TraversableOnce[A])(k: Int): Vector[A]

そして、タイプにポンピングされた場合、TraversableOnceこのように使用できます

val tenRandomInts = (Int.Min to Int.Max) sample 10

によって行われる作業sampleは、基本的に次のfoldとおりです。

def sample[A](col: Traversable[A])(k: Int): Vector[A] = {
    col.foldLeft(Vector()){update(k)(_: Vector[A], _: A)}
}

ただし、updateステートフルです。nそれは、すでに見たアイテムの数に依存します。(これは RNG にも依存しますが、簡単にするために、それはグローバルでステートフルであると仮定します。処理に使用される手法nは自明に拡張されます。) では、この状態をどのように処理するのでしょうか?

不純なソリューションはシンプルで、一定のスタックとヒープで実行されます。

/* Impure version of update function */
def update[A](k: Int) = new Function2[Vector[A], A, Vector[A]] {
    var n = 0
    def apply(sample: Vector[A], x: A): Vector[A] = {
        n += 1
        algorithmR(k, n, acc, x)
    }
}

def algorithmR(k: Int, n: Int, acc: Vector[A], x: A): Vector[A] = {
    if (sample.size < k) {
        sample :+ x // must keep first k elements
    } else {
        val r = rand.nextInt(n) + 1 // for simplicity, rand is global/stateful
        if (r <= k)
            sample.updated(r - 1, x) // sample is 0-index
        else
            sample
    }
}

しかし、純粋に機能的なソリューションはどうでしょうか? update追加のパラメーターとして取りn、更新されたサンプルと共に新しい値を返す必要があります。n暗黙的な状態にフォールド アキュムレータを含めることができます。

(col.foldLeft ((0, Vector())) (update(k)(_: (Int, Vector[A]), _: A)))._2

しかし、それは意図をあいまいにします。サンプルベクトルを累積するだけです。この問題は、State モナドとモナドの左折畳みのために用意されているようです。もう一度試してみましょう。

これらのインポートで Scalaz 7 を使用します

import scalaz._
import Scalaz._
import scalaz.std.iterable_

Iterable[A]Scalaz は a のモナド畳み込みをサポートしていないため、an を操作しTraversableます。

sample現在定義されています

// sample using State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {       
    type M[B] = State[Int, B]

    // foldLeftM is implemented using foldRight, which must reverse `col`, blowing
    // the heap for large `col`.  Ignore this issue for now.
    // foldLeftM could be implemented differently or we could switch to
    // foldRightM, implemented using foldLeft.
    col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0
}

更新はどこにありますか

// update using State monad
def update(k: Int) = {
    (acc: Vector[A], x: A) => State[Int, Vector[A]] {
        n => (n + 1, algorithmR(k, n + 1, acc, x)) // algR same as impure solution
    }
}

残念ながら、これは大規模なコレクションのスタックを吹き飛ばします。

それではトランポリンしましょう。sample今でしょ

// sample using trampolined State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {
    import Free.Trampoline

    type TrampolinedState[S, B] = StateT[Trampoline, S, B]
    type M[B] = TrampolinedState[Int, B]

    // Same caveat about foldLeftM using foldRight and blowing the heap
    // applies here.  Ignore for now. This solution blows the heap anyway;
    // let's fix that issue first.
    col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0 run
}

更新はどこにありますか

// update using trampolined State monad
def update(k: Int) = {
    (acc: Vector[A], x: A) => StateT[Trampoline, Int, Vector[A]] {
        n => Trampoline.done { (n + 1, algorithmR(k, n + 1, acc, x) }
    }
}

これにより、スタック オーバーフローが修正されますが、非常に大きなコレクション (または非常に小さなヒープ) のヒープは依然として吹き飛ばされます。コレクション内の値ごとに 1 つの匿名関数がフォールド中に作成され (各パラメーターを閉じると思いx: Aます)、トランポリンが実行される前にヒープを消費します。(FWIW、State バージョンにもこの問題があります。スタック オーバーフローは、小さなコレクションで最初に表面化するだけです。)

4

2 に答える 2

7

私たちの本当の問題は、実行されていない State mobits によって使用されるヒープです。

いいえそうではありません。本当の問題は、コレクションがメモリに収まらず、コレクション全体foldLeftMfoldRightM強制することです。不純なソリューションの副作用は、実行中にメモリを解放することです。「純粋に機能的な」ソリューションでは、どこでもそれを行っていません。

の使用は、実際のコレクションの種類、その要素がどのように作成され、どのように破棄されるかIterableという重要な詳細を無視しています。colそして、必然的に、foldLeftMon Iterable. 厳しすぎる可能性があり、コレクション全体をメモリに強制しています。たとえば、 の場合、これまでに強制されたすべての要素Streamを保持している限り、メモリ内に保持されます。要素を記憶しないcol他の種類の怠け者である場合、折り畳みは依然として厳しすぎます。Iterable

同じ「実行されていない状態のモビット」が明らかにあるにもかかわらず、最初の例を試してみましたEphemeralStreamが、大きなヒープ圧力は見られませんでした。違いは、 anEphemeralStreamの要素が弱く参照され、foldRightストリーム全体を強制しないことです。

を使用した場合、 2 番目の引数Foldable.foldrが遅延している関数でフォールドするため、問題のある動作は見られないのではないかと思います。フォールドを呼び出すと、すぐに次のようなサスペンションが返されるようにする必要があります。

Suspend(() => head |+| tail.foldRightM(...))

トランポリンが最初の一時停止を再開し、次の一時停止まで実行されると、一時停止間のすべての割り当てが使用可能になり、ガベージ コレクターによって解放されます。

次のことを試してください。

def foldM[M[_]:Monad,A,B](a: A, bs: Iterable[B])(f: (A, B) => M[A]): M[A] =
  if (bs.isEmpty) Monad[M].point(a)
  else Monad[M].bind(f(a, bs.head))(fax => foldM(fax, bs.tail)(f))

val MS = StateT.stateTMonadState[Int, Trampoline]
import MS._

foldM[M,R,Int](Monoid[R].zero, col) {
  (x, r) => modify(_ + 1) map (_ => Monoid[R].append(x, r))
} run 0 run

これは、トランポリンされたモナドの場合は一定のヒープで実行されますが、トランポリンされてMいないモナドの場合はスタックをオーバーフローします。

しかし、実際の問題は、Iterable大きすぎてメモリに収まらないデータを適切に抽象化できないことです。確かに、繰り返しごとに明示的に要素を破棄するか、遅延右折畳みを使用する命令型の副作用プログラムを作成できます。そのプログラムを別のプログラムで構成するまで、これはうまく機能します。Stateそもそもモナドでこれを行うことを調査している理由は、合成性を獲得するためだと思います。

それで、あなたは何ができますか?以下にいくつかのオプションを示します。

  1. Reducer、 、およびそれらの構成を利用し、最後のステップMonoidとして命令型の明示的に解放するループ (またはトランポリンされた遅延右フォールド) を実行します。その後、構成は不可能または予期されません。
  2. Iterateeコンポジションとモナドを使用Enumeratorしてそれらを養います。
  3. Scalaz-Streamを使用して合成ストリーム トランスデューサを記述します。

これらのオプションの最後のオプションは、一般的なケースで使用および推奨するオプションです。

于 2013-12-25T00:15:56.520 に答える
1

State、または同様のモナドを使用することは、問題への良いアプローチではありません。を使用するStateと、大規模なコレクションでスタック/ヒープを吹き飛ばすことが非難されます。大規模なコレクションから構築された値を考えてみましょうx: State[A,B](たとえば、それを折りたたむことによって)。次にx、初期状態のさまざまな値で評価できA、さまざまな結果が得られます。したがってx、コレクションに含まれるすべての情報を保持する必要があります。純粋な設定では、xスタック/ヒープを吹き飛ばさないようにいくつかの情報を忘れることはできません。そのため、結果が評価された後にのみ発生するモナド値全体が解放されるまで、計算されたものはすべてメモリに残ります。したがって、 のメモリ消費量はxコレクションのサイズに比例します。

この問題への適切なアプローチは、機能的なiteratees/pipes/conduitsを使用することだと思います。この概念 (これら 3 つの名前で呼ばれます) は、一定のメモリ消費量で大量のデータ コレクションを処理し、単純なコンビネータを使用してそのようなプロセスを記述するために考案されました。

Scalaz' を使用しようとしましIterateesたが、この部分はまだ成熟していないようで、スタック オーバーフローに悩まされてStateいます (または、正しく使用していない可能性があります。興味のある方は、コードをここで入手できます)。

ただし、私の(まだ少し実験的な) scala-conduitライブラリを使用すると簡単でした(免責事項:私は著者です):

import conduit._
import conduit.Pipe._

object Run extends App {
  // Define a sampling function as a sink: It consumes
  // data of type `A` and produces a vector of samples.
  def sampleI[A](k: Int): Sink[A, Vector[A]] =
    sampleI[A](k, 0, Vector())

  // Create a sampling sink with a given state. It requests
  // a value from the upstream conduit. If there is one,
  // update the state and continue (the first argument to `requestF`).
  // If not, return the current sample (the second argument).
  // The `Finalizer` part isn't important for our problem.
  private def sampleI[A](k: Int, n: Int, sample: Vector[A]):
                  Sink[A, Vector[A]] =
    requestF((x: A) => sampleI(k, n + 1, algorithmR(k, n + 1, sample, x)),
             (_: Any) => sample)(Finalizer.empty)


  // The sampling algorithm copied from the question.
  val rand = new scala.util.Random()

  def algorithmR[A](k: Int, n: Int, sample: Vector[A], x: A): Vector[A] = {
    if (sample.size < k) {
      sample :+ x // must keep first k elements
    } else {
      val r = rand.nextInt(n) + 1 // for simplicity, rand is global/stateful
      if (r <= k)
        sample.updated(r - 1, x) // sample is 0-index
      else
        sample
    }
  }

  // Construct an iterable of all `short` values, pipe it into our sampling
  // funcition, and run the combined pipe.
  {
    print(runPipe(Util.fromIterable(Short.MinValue to Short.MaxValue) >->
          sampleI(10)))
  }
}

更新:を使用して問題を解決することは可能ですが、定数スペースを実行する方法を知っているStateため、特にカスタム フォールドを実装する必要があります。State

import scala.collection._
import scala.language.higherKinds
import scalaz._
import Scalaz._
import scalaz.std.iterable._

object Run extends App {
  // Folds in a state monad over a foldable
  def stateFold[F[_],E,S,A](xs: F[E],
                            f: (A, E) => State[S,A],
                            z: A)(implicit F: Foldable[F]): State[S,A] =
    State[S,A]((s: S) => F.foldLeft[E,(S,A)](xs, (s, z))((p, x) => f(p._2, x)(p._1)))


  // Sample a lazy collection view
  def sampleS[F[_],A](k: Int, xs: F[A])(implicit F: Foldable[F]):
                  State[Int,Vector[A]] =
    stateFold[F,A,Int,Vector[A]](xs, update(k), Vector())

  // update using State monad
  def update[A](k: Int) = {
    (acc: Vector[A], x: A) => State[Int, Vector[A]] {
        n => (n + 1, algorithmR(k, n + 1, acc, x)) // algR same as impure solution
    }
  }

  def algorithmR[A](k: Int, n: Int, sample: Vector[A], x: A): Vector[A] = ...

  {
    print(sampleS(10, (Short.MinValue to Short.MaxValue)).eval(0))
  }
}
于 2013-12-25T21:04:17.430 に答える