28

質問から始めます:Scala APIを使用Iterateeしてファイルをクラウドストレージにアップロードする方法(私の場合はAzure Blob Storageですが、今は最も重要ではないと思います)

バックグラウンド:

大きなメディアファイル(300 MB以上)をAzureとして保存するには、入力を約1MBのブロックにチャンクする必要がありますBlockBlobs。残念ながら、私のScalaの知識はまだ不十分です(私のプロジェクトはJavaベースであり、Scalaの唯一の用途はUploadコントローラーです)。

私はこのコードで試しました:なぜBodyParserのIterateeで呼び出しエラーが発生したり、Play Framework2.0でリクエストがハングしたりするのですか?(としてInput Iteratee)-それは非常にうまく機能しますがElement、私が使用できるものはそれぞれ8192バイトのサイズであるため、数百メガバイトのファイルをクラウドに送信するには小さすぎます。

それは私にとってまったく新しいアプローチであり、おそらく私は何かを誤解していると言わなければなりません(私がすべてを誤解しているとは言いたくないです;>)

そのトピックに役立つヒントやリンクをいただければ幸いです。同様の使用法のサンプルがあれば、それは私がアイデアを得るのに最適なオプションです。

4

4 に答える 4

35

基本的に、最初に必要なのは、入力をより大きなチャンク (1024 * 1024 バイト) として再チャンクすることです。

まず、Iteratee最大 1m のバイトを消費する を用意します (最後のチャンクを小さくしても問題ありません)。

val consumeAMB = 
  Traversable.takeUpTo[Array[Byte]](1024*1024) &>> Iteratee.consume()

Enumerateeそれを使用して、 grouped と呼ばれる API を使用して、チャンクを再グループ化する (アダプター) を構築できます。

val rechunkAdapter:Enumeratee[Array[Byte],Array[Byte]] =
  Enumeratee.grouped(consumeAMB)

ここで grouped は を使用してIteratee、各チャンクに入れる量を決定します。そのために、consumeAMB を使用します。つまり、結果は、Enumeratee入力をArray[Byte]1MB に再チャンクすることになります。

BodyParser次に、メソッドを使用してIteratee.foldMバイトの各チャンクを送信する を記述する必要があります。

val writeToStore: Iteratee[Array[Byte],_] =
  Iteratee.foldM[Array[Byte],_](connectionHandle){ (c,bytes) => 
    // write bytes and return next handle, probable in a Future
  }

foldM は状態を渡し、渡された関数でそれを使用して(S,Input[Array[Byte]]) => Future[S]状態の新しい Future を返します。foldM は、Futureが完了して利用可能な入力のチャンクができるまで、関数を再度呼び出しません。

そして、body パーサーは入力を再チャンクし、それをストアにプッシュします。

BodyParser( rh => (rechunkAdapter &>> writeToStore).map(Right(_)))

右を返すことは、本文の解析 (ここではたまたまハンドラー) の最後までに本文を返すことを示します。

于 2012-08-15T03:12:16.250 に答える
3

あなたの目標が S3 にストリーミングすることである場合、実装してテストしたヘルパーを次に示します。

def uploadStream(bucket: String, key: String, enum: Enumerator[Array[Byte]])
                (implicit ec: ExecutionContext): Future[CompleteMultipartUploadResult] = {
  import scala.collection.JavaConversions._

  val initRequest = new InitiateMultipartUploadRequest(bucket, key)
  val initResponse = s3.initiateMultipartUpload(initRequest)
  val uploadId = initResponse.getUploadId

  val rechunker: Enumeratee[Array[Byte], Array[Byte]] = Enumeratee.grouped {
    Traversable.takeUpTo[Array[Byte]](5 * 1024 * 1024) &>> Iteratee.consume()
  }

  val uploader = Iteratee.foldM[Array[Byte], Seq[PartETag]](Seq.empty) { case (etags, bytes) =>
    val uploadRequest = new UploadPartRequest()
      .withBucketName(bucket)
      .withKey(key)
      .withPartNumber(etags.length + 1)
      .withUploadId(uploadId)
      .withInputStream(new ByteArrayInputStream(bytes))
      .withPartSize(bytes.length)

    val etag = Future { s3.uploadPart(uploadRequest).getPartETag }
    etag.map(etags :+ _)
  }

  val futETags = enum &> rechunker |>>> uploader

  futETags.map { etags =>
    val compRequest = new CompleteMultipartUploadRequest(bucket, key, uploadId, etags.toBuffer[PartETag])
    s3.completeMultipartUpload(compRequest)
  }.recoverWith { case e: Exception =>
    s3.abortMultipartUpload(new AbortMultipartUploadRequest(bucket, key, uploadId))
    Future.failed(e)
  }

}
于 2014-09-19T07:57:12.700 に答える
0

以下を構成ファイルに追加します

play.http.parser.maxMemoryBuffer=256K

于 2016-01-11T23:02:17.070 に答える
0

このストリーミングの問題の解決策も見つけようとしている人は、まったく新しい BodyParser を作成する代わりに、 parse.multipartFormData で既に実装されているものを使用することもできます。以下のようなものを実装して、デフォルトのハンドラーhandleFilePartAsTemporaryFileを上書きできます。

def handleFilePartAsS3FileUpload: PartHandler[FilePart[String]] = {
  handleFilePart {
    case FileInfo(partName, filename, contentType) =>

      (rechunkAdapter &>> writeToS3).map {
        _ =>
          val compRequest = new CompleteMultipartUploadRequest(...)
          amazonS3Client.completeMultipartUpload(compRequest)
          ...
      }
  }
}

def multipartFormDataS3: BodyParser[MultipartFormData[String]] = multipartFormData(handleFilePartAsS3FileUpload)

これを機能させることはできますが、アップロード プロセス全体がストリーミングされるかどうかはまだわかりません。大きなファイルをいくつか試してみましたが、クライアント側からファイル全体が送信されたときにのみ S3 アップロードが開始されるようです。

上記のパーサーの実装を見たところ、すべてが Iteratee を使用して接続されているため、ファイルをストリーミングする必要があると思います。誰かがこれについて何らかの洞察を持っている場合、それは非常に役立ちます。

于 2014-08-13T22:10:12.360 に答える