0

チャンクでアクターに到着するデータがあり、それらのチャンクをPlay のストリームとして返したいと考えていますResult。から応答を取得する唯一の方法Ok.streamは理想的な候補のように見えるため、次のようになります。

Action.async { request =>
  (source ? GetStream()).map {
     case enumerator => Ok.stream(enumerator)
  }
}

アクターからを返し、Enumerator[Array[Byte]]メッセージがアクターに到着すると、アクター内で列挙子にチャンクをプッシュし続けます。ただし、アクターから変更可能な Enumerator を返すことは、何らかの違反のように思えます。

これを達成するためのより適切な方法はありますか?akka-stream問題空間に対処する可能性のある抽象化であるか、抽象化であると考えましakka.ioたが、それらがどのように適用されるかわかりません。

4

1 に答える 1

0

より良い解決策が提案されるまで私が解決した解決策は、ActorDSL を使用して、非アクター呼び出し元のコンテキストで列挙子をキャプチャすることです。

case GET(p"/stream/$streamId") => Action.async { request =>
    val (enumerator, channel) = Concurrent.broadcast[Array[Byte]]

    actor(new Act {

      storage ! Get(streamId)
      become {
        case DataStart(id, parts, bytes) =>
          sender() ! DataAck(id)
          become {
            case DataPart(_, i, b) =>
              channel.push(b.toArray)
            case DataEnd(_) =>
              channel.eofAndEnd()
          }
      }
    })

    Ok.stream(enumerator).as("text/plain")
  }

アクターからの戻り値と列挙子に対する議論は、それが変更可能でシリアル化できないということです。ただし、一連のメッセージを受信して​​列挙子にフィードするには、アクターが必要です。DSL を介してアクターを作成することにより、アクターは呼び出し元のコンテキストに明示的に埋め込まれます。そのため、シリアライゼーションの境界を越えて列挙子がリークするリスクはありません。

于 2015-07-24T16:02:32.793 に答える