scalikejdbc を使用して巨大なテーブルにアクセスしています。私の理解では、すべての行をメモリにフェッチしてから、それらをマップまたは反復できます。
現在、rxscala Observable を使用した実装がありますが、これは非常に単純です。しかし、レシーバーは sql の読み取りよりも遅く、バッファリングのために OutOfMemory が発生します。観察可能な私の現在のプロデューサーは次のとおりです。
def fetchProductsAsObservable(
sql: SQL[Nothing,NoExtractor],
extractor: (WrappedResultSet) => ProductItem)
) =
Observable[ProductItem](o =>
try {
sql.foreach(row => o.onNext(extractor(row)))
o.onCompleted()
} catch {
case e: Throwable => o.onError(e)
}
)
SQL.foreachメソッドは知っていますが、コールバックメソッドを取得してUnitを返します。私のバックグラウンドは .NET です。並列処理のために提供できる scalikejdbc を使用して、scala で単純な Iterator を適切に実装する方法を自分で理解できませんか?