3

DB のデータにアクセスするために anorm を使用しています。DB は、Java で作成された別のサービスを使用して書き込まれ、ebean を使用して永続化されます。

次のscalaオブジェクトがあります

import java.sql.Connection

import scala.concurrent.{ Future, blocking, future }
import scala.concurrent.ExecutionContext.Implicits.global

import anorm.{ SQL, SqlQuery, SqlRow, sqlToSimple, toParameterValue }
import play.api.Logger
import play.api.Play.current
import play.api.db.DB

object Queries {

  private val readDataSource: String = play.Configuration.root().getString("data.provider.api.source", "default")
  //better IO execution context

  import play.api.libs.concurrent.Execution.Implicits.defaultContext

  private val dataSetDescription: SqlQuery = SQL("SELECT DISTINCT platform, name FROM data_nugget")

  private val identityCreationTime: SqlQuery = SQL("SELECT i.creation_time FROM identity i WHERE platform = {pfm} AND userid = {uid};")

  private val identityData: SqlQuery = SQL("SELECT n.name, n.value FROM data_nugget n WHERE platform = {pfm} AND userid = {uid};")

  private val playerData: SqlQuery = SQL("SELECT n.platform, n.name, n.value, r.userid, r.registration_time FROM data_nugget n JOIN registration r ON n.platform=r.platform AND n.userid=r.userid  WHERE r.playerid = {pid} AND r.application = {app};")

  private def withAsyncAnormConnection(function: Connection => Stream[SqlRow]): Future[List[SqlRow]] = {
    future {
      blocking {
        DB.withConnection(readDataSource)(c => function(c)).toList
      }
    }
  }

  def fetchDistinctDataNames(): Future[List[SqlRow]] = {
    withAsyncAnormConnection(implicit c => dataSetDescription())
  }

  def fetchIdentityCreationTime(platform: String, userid: String): Future[List[SqlRow]] = {
    withAsyncAnormConnection(implicit c => identityCreationTime.on("pfm" -> platform, "uid" -> userid)())
  }

  def fetchIdentityData(platform: String, userid: String): Future[List[SqlRow]] = {
    withAsyncAnormConnection(implicit c => identityData.on("pfm" -> platform, "uid" -> userid)())
  }

  def fetchRegistrationData(game: String, playerid: String): Future[List[SqlRow]] = {
    withAsyncAnormConnection(implicit c => playerData.on("app" -> game, "pid" -> playerid)())
  }

}

これを使用して、SQL クエリの実行を先物内にラップします。

これらのクエリを実行するたびに、次のスタック トレースでエラーが発生します。

(Error,com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1073)
com.mysql.jdbc.SQLError.createSQLException(SQLError.java:987)
com.mysql.jdbc.SQLError.createSQLException(SQLError.java:982)
com.mysql.jdbc.SQLError.createSQLException(SQLError.java:927)
com.mysql.jdbc.ResultSetImpl.checkClosed(ResultSetImpl.java:794)
com.mysql.jdbc.ResultSetImpl.next(ResultSetImpl.java:7139)
anorm.Sql$$anonfun$resultSetToStream$1.apply(Anorm.scala:527)
anorm.Sql$$anonfun$resultSetToStream$1.apply(Anorm.scala:527)
anorm.Useful$.unfold(Anorm.scala:315)
anorm.Useful$$anonfun$unfold$1.apply(Anorm.scala:317)
anorm.Useful$$anonfun$unfold$1.apply(Anorm.scala:317)
scala.collection.immutable.Stream$Cons.tail(Stream.scala:1078)
scala.collection.immutable.Stream$Cons.tail(Stream.scala:1070)
scala.collection.immutable.Stream.foreach(Stream.scala:548)
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:178)
scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
scala.collection.TraversableLike$class.to(TraversableLike.scala:629)
scala.collection.AbstractTraversable.to(Traversable.scala:105)
scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:243)
scala.collection.AbstractTraversable.toList(Traversable.scala:105)
controllers.dataprovider.data.Queries$$anonfun$withAsyncAnormConnection$1$$anonfun$apply$1.apply(Queries.scala:31)
controllers.dataprovider.data.Queries$$anonfun$withAsyncAnormConnection$1$$anonfun$apply$1.apply(Queries.scala:31)
scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory$$anon$2$$anon$3.block(ExecutionContextImpl.scala:44)
scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:2803)
scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory$$anon$2.blockOn(ExecutionContextImpl.scala:41)
scala.concurrent.package$.blocking(package.scala:50)
controllers.dataprovider.data.Queries$$anonfun$withAsyncAnormConnection$1.apply(Queries.scala:30)
controllers.dataprovider.data.Queries$$anonfun$withAsyncAnormConnection$1.apply(Queries.scala:30)
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
scala.concurrent.forkjoin.ForkJoinTask$AdaptedRunnableAction.exec(ForkJoinTask.java:1417)
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:262)
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975)
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1478)
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104))

jdbc を使用する以前の Java サービスでそれらに遭遇しましたが、ここでは ResultSet には触れていません。また、接続から受け取った行のストリームからできるだけ早くリストを返しています。

何が起こっている?ResultSet をどこで閉じていますか? 何をリファクタリングしましたか?

注として、このサービスのプロトタイプ (すべてがコントローラーにあるとき) では、SQL("...") をコードに直接、次のように記述していました。

future {
    blocking {
      DB.withConnection(implicit c => {
        SQL("SELECT DISTINCT platform, name FROM data_nugget")().map(row => (row[String]("platform"), row[String]("name"))).toArray
      })
    }
  }

そしてそれはうまくいきました。

PS:スタックトレースとコードの長いコピー/貼り付けで申し訳ありません...詳細にしようとしています。

4

2 に答える 2

2

機能しているコードと機能していないコードには違いがあります。map実際の例では、怠惰なインスタンスを呼び出してStreamRowます。動作しない例では、toListを使用せずに呼び出していmapます。たぶん、ブロック内mapの基盤の完全な処理を強制していてそうではなく、ブロックの外に出て基盤が閉じられるまで怠惰なままです。おそらく、新しいコードを変更して結果をマッピングし (実際のマッピング ロジックではなく、それ自体にマッピングします)、これで何かが修正されるかどうかを確認できます。ResultSetwithConnectiontoListwithConnectionResultSetRow

于 2013-05-14T01:08:47.420 に答える