4

jobs_queueMongoDBにコレクションがあります。これは、テーラブル カーソルを使用してポーリングしている上限付きコレクションです。

val cur =
  jobsQueue
    .find(Json.obj("done" -> Json.obj("$ne" -> true)))
    .options(QueryOpts().tailable.awaitData)
    .cursor[JsObject]

cur.enumerate() |>>> Iteratee.foreach { queuedDoc =>
  // do some processing and store the results back in the DB
}

これは通常の Scala から呼び出されるAppため、Akka や Play のラッピングはまったくありません。

App明示的に から抜け出すまで が終了しないようにする最も適切な方法はIteratee.foreach何ですか? また、より単純な (多少エレガントでなくても) 方法があれば、play-iteratees をまったく使用する必要はありません。


PS 私はコレクションがキャップされていることを確認します:

val jobsQueueMaybe = db.collection[JSONCollection]("jobs_queue")
val jobsQueue: JSONCollection =
  jobsQueueMaybe.stats()
    .flatMap {
      case stats if !stats.capped =>
        jobsQueueMaybe.convertToCapped(size = 1024 * 1024, maxDocuments = None)
      case _ =>
        Future(jobsQueueMaybe)
    }
    .recover { case _ => jobsQueueMaybe.createCapped(size = 1024 * 1024, maxDocuments = None) }
    .map { _ => jobsQueueMaybe }

PPS

また、このビットのロジックをどのように設計したか、およびアプローチを再考して実装を少し見直すことでこれを解決する方法についての批判も歓迎します。

4

1 に答える 1

1

現在の回避策として、各反復が Future を返すようにからIteratee.foreachに変更しました。Iteratee.foldMこのようにすると、ReactiveMongo が中断されるまで計算を続行するように強制されるように見えますが、それとは対照的にforeach、終了が早すぎるように見えます。

cur.enumerate() |>>> Iteratee.foldM(()) { (acc, queuedDoc) =>
  // always yield something like Future.successful(acc) or an actual `Future[Unit]`
}

次に、プログラム全体が終了するまで待つ必要があります (これは、何かがstopSignal: ConcurrentLinkedQueue:

while (stopSignal.isEmpty) Thread.sleep(1000)

しかし、それはうまく機能しますが、私はそのソリューションが特に好きではありません.

私の恐れは不当かもしれませんが、これをどのように解決すべきかについて、もう少し信頼できる答えが欲しいです.

于 2015-01-18T13:41:37.237 に答える