4

slick 3.0.0 クエリから返されたデータを db.stream(yourquery) 経由で scalaz-stream にストリーミングしたかったのです。

react-streams.org は、さまざまなライブラリが実装する API とデータフロー モデルを使用しているようです。

scalaz-stream プロセスから洗練されたパブリッシャーに逆流するバック プレッシャーをどのように処理しますか?

4

3 に答える 3

3

https://github.com/krasserm/streamzをご覧ください

Streamz は scalaz-stream のリソース結合ライブラリです。これにより、Process インスタンスは次のものを消費および生成できます。

  • Apache Camel エンドポイント
  • Akka Persistence ジャーナルとスナップショット ストアと
  • バックプレッシャーを完全にサポートする Akka Stream フロー (リアクティブ ストリーム)
于 2016-01-14T19:59:34.907 に答える
2

私はついに自分の質問に答えました。scalaz-streams キューを使用してストリーミング結果をキューに入れたい場合。

def getData[T](publisher: slick.backend.DatabasePublisher[T],
  queue: scalaz.stream.async.mutable.Queue[T], batchRequest: Int = 1): Task[scala.concurrent.Future[Long]] =
  Task {
    val p = scala.concurrent.Promise[Unit]()
    var counter: Long = 0
    val s = new org.reactivestreams.Subscriber[T] {
      var sub: Subscription = _

      def onSubscribe(s: Subscription): Unit = {
        sub = s
        sub.request(batchRequest)
      }

      def onComplete(): Unit = {
        sub.cancel()
        p.success(counter)
      }

      def onError(t: Throwable): Unit = p.failure(t)

      def onNext(e: T): Unit = {
        counter += 1
        queue.enqueueOne(e).run
        sub.request(batchRequest)
      }
    }
    publisher.subscribe(s)
    p.future
  }

これを使用して実行するrunと、終了時にクエリのストリーミングが終了したことを意味する Future が取得されます。すべてのデータが到着するまで計算を待機させたい場合は、この Future を構成できます。getData続行する前にすべてのデータを実行する必要がある場合は、Task に Await を使用して、返された Task オブジェクトで計算を作成することもできます。私がやっていることは、future の完了で構成し、キューをシャットダウンして、scalaz-stream がきれいに終了することを認識できるようにすることです。

于 2015-03-21T13:36:23.100 に答える
0

Process を返す (user1763729 によって投稿されたものとは) わずかに異なる実装を次に示します。

def getData[T](publisher: DatabasePublisher[T], batchSize: Long = 1L): Process[Task, T] = {
 val q = async.boundedQueue[T](10)

 val subscribe = Task.delay {
  publisher.subscribe(new Subscriber[T] {

    @volatile var subscription: Subscription = _

    override def onSubscribe(s: Subscription) {
      subscription = s
      subscription.request(batchSize)
    }

    override def onNext(next: T) = {
        q.enqueueOne(next).attemptRun
        subscription.request(batchSize)
    }

    override def onError(t: Throwable) = q.fail(t).attemptRun

    override def onComplete() = q.close.attemptRun
  })
 }

 Process.eval(subscribe).flatMap(_ => q.dequeue)
}
于 2016-04-27T14:26:22.457 に答える