2

以下のコードは、クライアントにストリーミングを戻します。私が収集するのは、JavaのIOストリームを使用するよりも慣用的な方法です。ただし、問題があります。ストリームが実行された後、接続が開いたままになります。

def getImage() = Action { request =>
  val imageUrl = "http://hereandthere.com/someimageurl.png"
  Ok.stream({ content: Iteratee[Array[Byte], Unit] => 
    WS.url(imageUrl).withHeaders("Accept"->"image/png").get { response => content }
    return
  }).withHeaders("Content-Type"->"image/png")
}

これは、内部APIからリクエスターに大きな(> 1 mb)ファイルをストリーミングすることを目的としています。

問題は、なぜ接続を開いたままにするのかということです。アップストリームサーバーに期待するものはありますか?curlを使用してアップストリームサーバーをテストしましたが、接続が閉じられます。このプロキシを通過したときに接続が閉じられません。

4

3 に答える 3

4

ストリームが終了しない理由は、WS.get()呼び出しから返されるiterateeにEOFが送信されないためです。この明示的なEOFがない場合、接続は開いたままになります-チャンクモードであるため、長時間実行される彗星のような接続になる可能性があります。

修正されたコードは次のとおりです。

Ok.stream({ content: Iteratee[Array[Byte], Unit] => 
  WS.url(imageUrl)
    .withHeaders("Accept"->"image/png")
    .get { response => content }
    .onRedeem { ii =>
       ii.feed(Input.EOF)
    }
}).withHeaders("Content-Type"->"image/png")
于 2012-10-14T16:45:48.757 に答える
1

これがplay2.1.0の修正バージョンです。https://groups.google.com/forum/#!msg/play-framework/HwoRR-nipCc/gUKs9NexCx4Jを参照してください

共有してくれたAnatolyGに感謝します。

def proxy = Action {

   val url = "..."

   Async {
     val iterateePromise = Promise[Iteratee[Array[Byte], Unit]]
     val resultPromise = Promise[ChunkedResult[Array[Byte]]]

     WS.url(url).get { responseHeaders =>
       resultPromise.success {
         new Status(responseHeaders.status).stream({ content: Iteratee[Array[Byte], Unit] =>
           iterateePromise.success(content)
         }).withHeaders(
           "Content-Type" -> responseHeaders.headers.getOrElse("Content-Type", Seq("application/octet-stream")).head,
           "Connection" -> "Close")
       }
       Iteratee.flatten(iterateePromise.future)
     }.onComplete {
       case Success(ii) => ii.feed(Input.EOF)
       case Failure(t) => resultPromise.failure(t)
     }

     resultPromise.future
   }

}
于 2013-10-01T15:02:58.827 に答える
1

play 2.2.xのアップデート:

def proxy = Action.async {
  val url = "http://localhost:9000"

  def enumerator(chunks: Iteratee[Array[Byte], Unit] => _) = {
    new Enumerator[Array[Byte]] {
      def apply[C](i: Iteratee[Array[Byte], C]): Future[Iteratee[Array[Byte], C]] = {
        val doneIteratee = Promise[Iteratee[Array[Byte], C]]()
        chunks(i.map {
          done =>
            doneIteratee.success(Done[Array[Byte], C](done)).asInstanceOf[Unit]
        })
        doneIteratee.future
      }
    }
  }

  val iterateePromise = Promise[Iteratee[Array[Byte], Unit]]()
  val resultPromise = Promise[SimpleResult]()

  WS.url(url).get {
    responseHeaders =>

      resultPromise.success(new Status(responseHeaders.status).chunked(
        enumerator({
          content: Iteratee[Array[Byte], Unit] => iterateePromise.success(content)
        }
        )).withHeaders(
        "Content-Type" -> responseHeaders.headers.getOrElse("Content-Type", Seq("application/octet-stream")).head,
        "Connection" -> "Close"))

      Iteratee.flatten(iterateePromise.future)
  }.onComplete {
    case Success(ii) => ii.feed(Input.EOF)
    case Failure(t) => throw t
  }

  resultPromise.future
}

誰かがより良い解決策を持っているなら、それは私に非常に興味があります!

于 2013-11-28T20:25:01.730 に答える