slick 3.0.0 クエリから返されたデータを db.stream(yourquery) 経由で scalaz-stream にストリーミングしたかったのです。
react-streams.org は、さまざまなライブラリが実装する API とデータフロー モデルを使用しているようです。
scalaz-stream プロセスから洗練されたパブリッシャーに逆流するバック プレッシャーをどのように処理しますか?
slick 3.0.0 クエリから返されたデータを db.stream(yourquery) 経由で scalaz-stream にストリーミングしたかったのです。
react-streams.org は、さまざまなライブラリが実装する API とデータフロー モデルを使用しているようです。
scalaz-stream プロセスから洗練されたパブリッシャーに逆流するバック プレッシャーをどのように処理しますか?
https://github.com/krasserm/streamzをご覧ください
Streamz は scalaz-stream のリソース結合ライブラリです。これにより、Process インスタンスは次のものを消費および生成できます。
私はついに自分の質問に答えました。scalaz-streams キューを使用してストリーミング結果をキューに入れたい場合。
def getData[T](publisher: slick.backend.DatabasePublisher[T],
queue: scalaz.stream.async.mutable.Queue[T], batchRequest: Int = 1): Task[scala.concurrent.Future[Long]] =
Task {
val p = scala.concurrent.Promise[Unit]()
var counter: Long = 0
val s = new org.reactivestreams.Subscriber[T] {
var sub: Subscription = _
def onSubscribe(s: Subscription): Unit = {
sub = s
sub.request(batchRequest)
}
def onComplete(): Unit = {
sub.cancel()
p.success(counter)
}
def onError(t: Throwable): Unit = p.failure(t)
def onNext(e: T): Unit = {
counter += 1
queue.enqueueOne(e).run
sub.request(batchRequest)
}
}
publisher.subscribe(s)
p.future
}
これを使用して実行するrun
と、終了時にクエリのストリーミングが終了したことを意味する Future が取得されます。すべてのデータが到着するまで計算を待機させたい場合は、この Future を構成できます。getData
続行する前にすべてのデータを実行する必要がある場合は、Task に Await を使用して、返された Task オブジェクトで計算を作成することもできます。私がやっていることは、future の完了で構成し、キューをシャットダウンして、scalaz-stream がきれいに終了することを認識できるようにすることです。
Process を返す (user1763729 によって投稿されたものとは) わずかに異なる実装を次に示します。
def getData[T](publisher: DatabasePublisher[T], batchSize: Long = 1L): Process[Task, T] = {
val q = async.boundedQueue[T](10)
val subscribe = Task.delay {
publisher.subscribe(new Subscriber[T] {
@volatile var subscription: Subscription = _
override def onSubscribe(s: Subscription) {
subscription = s
subscription.request(batchSize)
}
override def onNext(next: T) = {
q.enqueueOne(next).attemptRun
subscription.request(batchSize)
}
override def onError(t: Throwable) = q.fail(t).attemptRun
override def onComplete() = q.close.attemptRun
})
}
Process.eval(subscribe).flatMap(_ => q.dequeue)
}