15

基本的に私はこれを変換したい:

def data(block: T => Unit)

ストリームへ(dataToStreamは、この変換を行う架空の関数です):

val dataStream: Stream[T] = dataToStream(data)

この問題は継続することで解決できると思います。

// let's assume that we don't know how data is implemented
// we just know that it generates integers
def data(block: Int => Unit) { for (i <- 0 to 10) block(i) }

// here we can print all data integers
data { i => println(i) }

// >> but what we really want is to convert data to the stream <<

// very dumb solution is to collect all data into a list
var dataList = List[Int]()
data { i => dataList = i::dataList }
// and make a stream from it
dataList.toStream

// but we want to make a lazy, CPU and memory efficient stream or iterator from data
val dataStream: Stream[Int] = dataToStream(data)
dataStream.foreach { i => println(i) }

// and here a black magic of continuations must be used
// for me this magic is too hard to understand
// Does anybody know how dataToStream function could look like?

ありがとう、Dawid

4

4 に答える 4

11

EDITED: traversable.view の遅延を示すように例を修正しました

scala> def data(f : Int => Unit) = for(i <- 1 to 10) {    
     |   println("Generating " + i)
     |   f(i)
     | }
data: (f: (Int) => Unit)Unit

scala> def toTraversable[T]( func : (T => Unit) => Unit) = new Traversable[T] {
     |   def foreach[X]( f : T => X) = func(f(_) : Unit)                       
     | }                                                                       
toTraversable: [T](func: ((T) => Unit) => Unit)java.lang.Object with Traversable[T]

toTraversable メソッドは、データ関数を Traversable コレクションに変換します。それ自体はそれほど大きなものではありませんが、これをレイジーな TraversableView に変換できます。次に例を示します。

scala> toTraversable(data).view.take(3).sum
Generating 1
Generating 2
Generating 3
Generating 4
res1: Int = 6

take メソッドの残念な性質は、正しく機能するために最後に生成された値の 1 つ前に移動する必要があることですが、早期に終了してしまいます。上記のコードは、「.view」呼び出しがなくても同じように見えます。ただし、より説得力のある例を次に示します。

scala> toTraversable(data).view.take(2).foreach(println)
Generating 1
1
Generating 2
2
Generating 3

結論として、あなたが探しているコレクションは TraversableView であると思います。これは、Traversable を作成してから "view" を呼び出すビューを作成するのが最も簡単です。本当に Stream タイプが必要な場合は、2.8.0.final で機能し、スレッドなしで「ストリーム」を作成するメソッドを次に示します。

scala> def dataToStream( data : (Int => Unit) => Unit) = {
     |   val x = new Traversable[Int] {                     
     |     def foreach[U](f : Int => U) = {                 
     |        data( f(_) : Unit)                            
     |     }
     |   }
     |   x.view.toList.toStream                             
     | }
dataToStream: (data: ((Int) => Unit) => Unit)scala.collection.immutable.Stream[Int]

scala> dataToStream(data)
res8: scala.collection.immutable.Stream[Int] = Stream(0, ?)

このメソッドの残念な性質は、ストリームを作成する前にトラバース可能全体を反復処理することです。これは、すべての値をメモリにバッファリングする必要があることも意味します。唯一の代替手段は、スレッドに頼ることです。

余談ですが、これが Traversable を scalax.io.File メソッドからの直接の戻り値として好む動機となった理由です: "lines" "chars" および "bytes"。

于 2010-09-28T18:45:32.203 に答える
3

データを消費するスレッドを生成する簡単なソリューションを次に示します。データを SynchronousQueue にポストします。キューからデータをプルするストリームが作成され、返されます。

 def generatortostream[T](f: (T=>Unit)=>Unit): Stream[T] = {
  val queue = new java.util.concurrent.SynchronousQueue[Option[T]]
  val callbackthread = new Runnable {
    def run() { f((Some(_:T)) andThen (queue.put(_))); queue.put(None) }
  }   
  new Thread(callbackthread).start()
  Stream.continually(queue.take).takeWhile(_.isDefined).map(_.get)
}   
于 2010-09-26T18:15:00.973 に答える
2

これは、@ Geoff Reedy の提供物から適応された、区切られた継続ベースの実装です。

import Stream._
import scala.util.continuations._
import java.util.concurrent.SynchronousQueue

def toStream[A](data: (A=>Unit)=>Unit):Stream[A] = reset {
    val queue = new SynchronousQueue[Option[A]]
    queue.put(Some(shift { k: (A=>Unit) =>
        new Thread() { 
            override def run() {
                data(k)
                // when (if) the data source stops pumping, add None 
                // to signal that the stream is dead
                queue.put(None)
            }
        }.start()
        continually(queue.take).takeWhile(_.isDefined).map(_.get)
    })
}
于 2010-09-27T19:44:43.263 に答える
2

私はまだそれを自分で行う方法を見つけなければなりません。答えはここのどこかにあると思います:

編集:別の問題を解決する方法を示すコードを削除しました。

Edit2:最初に投稿されたコードhttp://gist.github.com/580157を使用してhttp://gist.github.com/574873、これを行うことができます:

object Main {
  import Generator._

  def data = generator[Int] { yld =>
    for (i <- suspendable(List.range(0, 11))) yld(i)
  }

  def main(args: Array[String]) {
    for( i <- data.toStream ) println(i)
  }
}

dataはブロックコードを取りませんが、これでいいと思います。なぜなら、継続により、呼び出し側でブロックを処理できるからです。Generator のコードは、github の gist で確認できます。

于 2010-09-26T15:18:56.013 に答える