12

x値を取り込んで、秒/ミリ秒ごとにチャンク化して出力するPlay2Enumerateeを作成したいと思います。このように、ユーザー入力が多いマルチユーザーWebSocket環境では、1秒あたりの受信フレーム数を制限できます。

次のように、設定された数のアイテムをグループ化できることを知っています。

val chunker = Enumeratee.grouped(
  Traversable.take[Array[Double]](5000) &>> Iteratee.consume()
)

アイテムの数ではなく、時間に基づいてこれを行う組み込みの方法はありますか?

どういうわけか、予定されているAkkaの仕事でこれを行うことを考えていましたが、一見すると非効率的であり、同時性の問題が発生するかどうかはわかりません。

4

2 に答える 2

3

このようにどうですか?これがお役に立てば幸いです。

 package controllers

 import play.api._
 import play.api.Play.current
 import play.api.mvc._
 import play.api.libs.iteratee._
 import play.api.libs.concurrent.Akka
 import play.api.libs.concurrent.Promise

 object Application extends Controller {

   def index = Action {
     val queue = new scala.collection.mutable.Queue[String]
     Akka.future {
       while( true ){
         Logger.info("hogehogehoge")
         queue += System.currentTimeMillis.toString
         Thread.sleep(100)
       }
     }
     val timeStream = Enumerator.fromCallback { () =>
       Promise.timeout(Some(queue), 200)
     }
     Ok.stream(timeStream.through(Enumeratee.map[scala.collection.mutable.Queue[String]]({ queue =>
       var str = ""
       while(queue.nonEmpty){
         str += queue.dequeue + ", "
       }
       str
     })))
   }

 }

また、このドキュメントも役立ちます。 http://www.playframework.com/documentation/2.0/Enumerators

UPDATE これはplay2.1バージョン用です。

 package controllers

 import play.api._
 import play.api.Play.current
 import play.api.mvc._
 import play.api.libs.iteratee._
 import play.api.libs.concurrent.Akka
 import play.api.libs.concurrent.Promise
 import scala.concurrent._
 import ExecutionContext.Implicits.global

 object Application extends Controller {

   def index = Action {
     val queue = new scala.collection.mutable.Queue[String]
     Akka.future {
       while( true ){
         Logger.info("hogehogehoge")
         queue += System.currentTimeMillis.toString
         Thread.sleep(100)
       }
     }
     val timeStream = Enumerator.repeatM{
       Promise.timeout(queue, 200)
     }
     Ok.stream(timeStream.through(Enumeratee.map[scala.collection.mutable.Queue[String]]({ queue =>
       var str = ""
       while(queue.nonEmpty){
         str += queue.dequeue + ", "
       }
       str
     })))
   }

 }
于 2013-03-18T00:32:25.863 に答える
2

ここでは、ミリ秒単位で測定された固定時間長tの入力から値を取得する反復子と、そのような長さt内に構築されたセグメントに分割された入力ストリームをグループ化してさらに処理できる列挙子をすばやく定義しました。iterateeが開始されてからどれだけの時間が経過したかを追跡するためにJodaTimeに依存しています。

def throttledTakeIteratee[E](timeInMillis: Long): Iteratee[E, List[E]] = {
  var startTime = new Instant()

  def step(state: List[E])(input: Input[E]): Iteratee[E, List[E]] = {
    val timePassed = new Interval(startTime, new Instant()).toDurationMillis

    input match {
      case Input.EOF => { startTime = new Instant; Done(state, Input.EOF) }
      case Input.Empty => Cont[E, List[E]](i => step(state)(i))
      case Input.El(e) =>
        if (timePassed >= timeInMillis) { startTime = new Instant; Done(e::state, Input.Empty) }
        else Cont[E, List[E]](i => step(e::state)(i))
    }
  }

  Cont(step(List[E]()))
}

def throttledTake[T](timeInMillis: Long) = Enumeratee.grouped(throttledTakeIteratee[T](timeInMillis))
于 2013-03-18T18:31:40.533 に答える