2

scalikejdbc を使用して巨大なテーブルにアクセスしています。私の理解では、すべての行をメモリにフェッチしてから、それらをマップまたは反復できます。

現在、rxscala Observable を使用した実装がありますが、これは非常に単純です。しかし、レシーバーは sql の読み取りよりも遅く、バッファリングのために OutOfMemory が発生します。観察可能な私の現在のプロデューサーは次のとおりです。

  def fetchProductsAsObservable(
    sql: SQL[Nothing,NoExtractor],
    extractor: (WrappedResultSet) => ProductItem)
  ) =
    Observable[ProductItem](o =>
      try {
          sql.foreach(row => o.onNext(extractor(row)))
          o.onCompleted()
      } catch {
        case e: Throwable => o.onError(e)
      }
    )

SQL.foreachメソッドは知っていますが、コールバックメソッドを取得してUnitを返します。私のバックグラウンドは .NET です。並列処理のために提供できる scalikejdbc を使用して、scala で単純な Iterator を適切に実装する方法を自分で理解できませんか?

4

1 に答える 1