4

私のコードには共通のパターンがあります。データベースからの結果を並べ替えましたが、ネストされた構造でそれらを出力する必要があります。これをストリーミングしたいので、一度にメモリに保持するレコードをできるだけ少なくしたいと考えています。TravesableLike.groupBy を使用すると、データがソートされていないと想定されるため、変更可能なマップが不必要に埋められます。これを真にストリーミングし続けたいと思います。ここで scalaz-stream は役に立ちますか?

val sql = """select grandparent_id, parent_id, child_id
  from children
  where grandparent_id = ?
  order by grandparent_id, parent_id, child_id"""

def elementsR[P, R](invoker: Invoker[P, R], param: P): Process[Task, R] =
  // Invoker.elements returns trait CloseableIterator[+T] extends Iterator[T] with Closeable
  resource(Task.delay(invoker.elements(param)))(
    src => Task.delay(src.close)) { src =>
      Task.delay { if (src.hasNext) src.next else throw End }
  }

def dbWookie {
  // grandparent_id, (grandparent_id, parent_id, child_id)
  val invoker = Q.query[Int, (Int, Int, Int)](sql)
  val es = elementsR(invoker, 42)

  // ?, ?, ?

  // nested emits (42, ((35, (1, 3, 7)), (36, (8, 9, 12))))
}

プロセスに foldLeft や scanLeft などの関数があまりないので、grandparent_id、parent_id、または child_id の変更を検出してグループを発行する方法がわかりません。何か案は?

4

1 に答える 1

6

と同様に機能するものが必要だと思いますchunkBychunkBy述語関数の結果が からtrueに反転するたびに、チャンクを発行しfalseます。

ブール値の比較から、入力の任意の関数の結果の比較まで、これを一般化できます。したがって、入力に適用されるこの関数の値が変化するたびにチャンクを発行するプロセスがあります。

def chunkOn[I, A](f: I => A): Process1[I, Vector[I]] = {
  def go(acc: Vector[I], last: A): Process1[I,Vector[I]] =
    await1[I].flatMap { i =>
      val cur = f(i)
      if (cur != last) emit(acc) then go(Vector(i), cur)
      else go(acc :+ i, cur)
    } orElse emit(acc)
  await1[I].flatMap(i => go(Vector(i), f(i)))
}

Identity モナドを使用してすぐに評価を強制する、REPL での簡単なダーティ テスト:

scala> import scalaz.stream._, scalaz.Id._
import scalaz.stream._
import scalaz.Id._

scala> val rows = Seq(('a, 'b, 'c), ('a, 'b, 'd), ('b, 'a, 'c), ('b, 'd, 'a))
rows: Seq[(Symbol, Symbol, Symbol)] = List(('a,'b,'c), ('a,'b,'d), ('b,'a,'c), ('b,'d,'a))

scala> val process = Process.emitSeq[Id, (Symbol, Symbol, Symbol)](rows)
process: scalaz.stream.Process[scalaz.Id.Id,(Symbol, Symbol, Symbol)] =
  Emit(List(('a,'b,'c), ('a,'b,'d), ('b,'a,'c), ('b,'d,'a)),Halt(scalaz.stream.Process$End$))

scala> process |> chunkOn(_._1)
res4: scalaz.stream.Process[scalaz.Id.Id,scala.collection.immutable.Vector[(Symbol, Symbol, Symbol)]] =
  Emit(List(Vector(('a,'b,'c), ('a,'b,'d))),Emit(List(Vector(('b,'a,'c), ('b,'d,'a))),Halt(scalaz.stream.Process$End$)))

あなたが提案したようにchunkWhen、現在および最後の値に対して述語を使用し、 と評価されたときにチャンクを発行しfalseます。

def chunkWhen[I](f: (I, I) => Boolean): Process1[I, Vector[I]] = {
  def go(acc: Vector[I]): Process1[I,Vector[I]] =
    await1[I].flatMap { i =>
      acc.lastOption match {
        case Some(last) if ! f(last, i) => emit(acc) then go(Vector(i))
        case _ => go(acc :+ i)
      }
    } orElse emit(acc)
  go(Vector())
}

試してみます:

scala> process |> chunkWhen(_._1 == _._1)
res0: scalaz.stream.Process[scalaz.Id.Id,Vector[(Symbol, Symbol, Symbol)]] =
  Emit(List(Vector(('a,'b,'c), ('a,'b,'d))),Emit(List(Vector(('b,'a,'c), ('b,'d,'a))),Halt(scalaz.stream.Process$End$)))
于 2013-09-30T21:24:02.430 に答える