18

Iteratee を介してファイルを S3 に送信する可能性についていくつか読んだことがあります。これにより、ファイルを受信したときにファイルの S3 チャンクを送信し、たとえば大きなファイルの OutOfMemory を回避できるようです。

おそらく私がする必要があるこの SO 投稿を見つけました: Play 2.x : Reactive file upload with Iteratees Sadek Brodi は、foldM は Play 2.1 で利用可能であると述べているため、例としてのみ)

Iteratees についてのブログを読んだことがあり、まだ Scala/Play2 の専門家ではない人のために、誰かがこれを簡単に説明できますか?

マルチパート ボディ パーサーなどを使用する必要があるかどうかもわかりませんが、このコードが何をしているのか理解できないことが 1 つあります。

val consumeAMB = 
  Traversable.takeUpTo[Array[Byte]](1028*1028) &>> Iteratee.consume()

val rechunkAdapter:Enumeratee[Array[Byte],Array[Byte]] =
  Enumeratee.grouped(consumeAMB)

val writeToStore: Iteratee[Array[Byte],_] =
  Iteratee.foldM[Array[Byte],_](connectionHandle){ (c,bytes) => 
    // write bytes and return next handle, probable in a Future
  }

BodyParser( rh => (rechunkAdapter &>> writeToStore).map(Right(_)))

ちなみに、従来のJava InputStream / OutputStreamを使用した場合と比較して、メモリ消費量の違いはどうなるでしょうか。私は実際に、Java + AsyncHttpClient + Grizzly を使用して、Iteratees を使用せずに、非常に低いメモリ消費量で非ブロッキングの方法で 500 MB のファイルを S3 に転送できます (ただし、Netty でも動作すると思います)。

では、Iteratee を使用する利点は何でしょうか?

私が見ることができる 1 つの違いは、取得して S3 に転送する InputStream は、私の場合、一時ファイル (これは CXF の動作です) によってサポートされているため、Play Iteratee ほど反応的ではない可能性があることです。

しかし、Iteratee では、Enumerator が接続によって受信されたバイトを生成し、Iteratee を介して S3 に転送する場合、S3 への接続が良好ではなく、バイトを非常に高速に転送できない場合、「保留中」のバイトが格納されます。 ?

4

1 に答える 1

5

簡単な説明?私が試してみます。:)

コンポーネントからパイプラインを構築しています。パイプラインを構築したら、データを送信できます。これはIterateeであるため、データを反復処理する方法を知っています。

アップロードしたいファイルはリクエストボディに含まれており、BodyParser は Play でリクエストボディを処理するものです。したがって、iteratee パイプラインを BodyParser に入れます。リクエストが行われると、パイプラインにデータが送信されます (データが繰り返されます)。

パイプライン ( rechunkAdapter &>> writeToStore) はデータを 1MB ビットにチャンクし、S3 に送信します。

パイプラインの最初の部分 ( rechunkAdapter) はチャンキングを行います。実際には、チャンキングを行う独自のミニパイプラインがあります ( consumeAMB)。ミニパイプは、チャンクを作成するのに十分なデータを受信すると、それをメイン パイプラインに送信します。

パイプラインの 2 番目の部分 ( writeToStore) は、各チャンクで呼び出されるループのようなもので、各チャンクを S3 に送信する機会を与えます。

iteratees の利点は?

何が起こっているかがわかれば、コンポーネントをつなぎ合わせて反復パイプラインを構築できます。また、タイプ チェッカーは、何かを正しく接続していない場合に最も頻繁に通知します。

たとえば、上記のパイプラインを変更して、遅いという事実を修正できます。チャンクが S3 にアップロードする準備が整うたびにリクエストのアップロードが一時停止されるため、おそらく低速です。メモリ不足にならないようにリクエストのアップロードを遅くすることは重要ですが、固定サイズのバッファを追加することでもう少し寛容になることができます。したがってConcurrent.buffer(2)、パイプラインの途中に追加して、最大 2 つのチャンクをバッファリングします。

Iteratee は、ストリームへの機能的なアプローチを提供します。これは、関数型プログラミングについてどのように感じるかによって、長所または短所になります。:) 遅延ストリーム (別の機能的アプローチ) と比較して、反復はリソースの使用を正確に制御します。

最後に、反復により、非常に複雑な非同期ストリーム プログラミングを比較的 (!) 簡単に行うことができます。スレッドを保持せずに IO を処理できるため、スケーラビリティが大幅に向上します。従来の Java InputStream/OutputStream の例では、2 つのスレッドが必要です。

于 2013-11-13T22:59:07.017 に答える