0

Source[String, Unit]をストリーミング アクターに接続するにはどうすればよいですか?

https://gist.github.com/whysoserious/96050c6b4bd5fedb6e33StreamingActorの修正版はうまく機能すると思いますが、ピースを接続するのに苦労しています。

と が与えられた場合、変更されたは と接続する必要があるsource: Source[String, Unit]ctx: RequestContext思います。StreamingActoractorRefFactory.actorOf(fromSource(source, ctx))

参考までに、上記の要点:

import akka.actor._
import akka.util.ByteString
import spray.http.HttpEntity.Empty
import spray.http.MediaTypes._
import spray.http._
import spray.routing.{HttpService, RequestContext, SimpleRoutingApp}

object StreamingActor {

  // helper methods

  def fromString(iterable: Iterable[String], ctx: RequestContext): Props = {
    fromHttpData(iterable.map(HttpData.apply), ctx)
  }
  def fromStringAndCharset(iterable: Iterable[String], ctx: RequestContext, charset: HttpCharset): Props = {
    fromHttpData(iterable.map(HttpData.apply), ctx)
  }
  def fromByteArray(iterable: Iterable[Array[Byte]], ctx: RequestContext): Props = {
    fromHttpData(iterable.map(HttpData.apply), ctx)
  }
  def fromByteString(iterable: Iterable[ByteString], ctx: RequestContext): Props = {
    fromHttpData(iterable.map(HttpData.apply), ctx)
  }
  def fromHttpData(iterable: Iterable[HttpData], ctx: RequestContext): Props = {
    Props(new StreamingActor(iterable, ctx))
  }

  // initial message sent by StreamingActor to itself
  private case object FirstChunk

  // confirmation that given chunk was sent to client
  private case object ChunkAck

}

class StreamingActor(chunks: Iterable[HttpData], ctx: RequestContext) extends Actor with HttpService with ActorLogging {

  import StreamingActor._

  def actorRefFactory = context

  val chunkIterator: Iterator[HttpData] = chunks.iterator

  self ! FirstChunk

  def receive = {

    // send first chunk to client
    case FirstChunk if chunkIterator.hasNext =>
      val responseStart = HttpResponse(entity = HttpEntity(`text/html`, chunkIterator.next()))
      ctx.responder ! ChunkedResponseStart(responseStart).withAck(ChunkAck)

    // data stream is empty. Respond with Content-Length: 0 and stop
    case FirstChunk =>
      ctx.responder ! HttpResponse(entity = Empty)
      context.stop(self)

    // send next chunk to client  
    case ChunkAck if chunkIterator.hasNext =>
      val nextChunk = MessageChunk(chunkIterator.next())
      ctx.responder ! nextChunk.withAck(ChunkAck)

    // all chunks were sent. stop.  
    case ChunkAck =>
      ctx.responder ! ChunkedMessageEnd
      context.stop(self)

    //   
    case x => unhandled(x)
  }

}
4

1 に答える 1

2

StreamingActora の使用は、解決しようとしている根本的な問題を過度に複雑にしていると思います。さらに、問題の StreamingActor はHttpResponse、単一のHttpRequest. HttpEntity.Chunkedこれは、データ ストリーム ソースのエンティティとしてを含む 1 つの HttpReponse を単純に返すことができるため、非効率的です。

一般的な同時実行設計

アクターは状態のためのものです。たとえば、接続間で実行中のカウンターを維持します。それでも、Agent型チェックの追加の利点で多くの領域をカバーします (実行時に配信不能メールボックスを唯一の型チェッカーに変える Actor.receive とは異なります)。

状態ではなく同時計算は、(順番に)次のように処理する必要があります。

  1. 最初の考慮事項としての先物: 構成可能、コンパイル時のタイプ セーフ チェック、およびほとんどの場合に最適な選択。

  2. akka Streams : 構成可能で、コンパイル時のタイプ セーフ チェックであり、非常に便利ですが、便利なバックプレッシャー機能により多くのオーバーヘッドが発生します。以下に示すように、Steam は HttpResponse エンティティの形成方法でもあります。

CSV ファイルのストリーミング

根本的な問題は、Streams を使用して csv ファイルを http クライアントにストリーミングする方法です。データ ソースを作成し、それを HttpResponse 内に埋め込むことから始めることができます。

def lines() = scala.io.Source.fromFile("DataFile.csv").getLines()

import akka.util.ByteString
import akka.http.model.HttpEntity

def chunkSource : Source[HttpEntity.ChunkStreamPart, Unit] = 
  akka.stream.scaladsl.Source(lines)
                      .map(ByteString.apply)
                      .map(HttpEntity.ChunkStreamPart.apply)

def httpFileResponse = 
  HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, chunkSource))

その後、すべてのリクエストに対して次のレスポンスを提供できます。

val fileRequestHandler = {
  case HttpRequest(GET, Uri.Path("/csvFile"), _, _, _) => httpFileResponse
}   

次に、fileRequestHandler をサーバー ルーティング ロジックに埋め込みます。

于 2015-11-05T13:38:28.050 に答える