Source[String, Unit]
をストリーミング アクターに接続するにはどうすればよいですか?
https://gist.github.com/whysoserious/96050c6b4bd5fedb6e33StreamingActor
の修正版はうまく機能すると思いますが、ピースを接続するのに苦労しています。
と が与えられた場合、変更されたは と接続する必要があるsource: Source[String, Unit]
とctx: RequestContext
思います。StreamingActor
actorRefFactory.actorOf(fromSource(source, ctx))
参考までに、上記の要点:
import akka.actor._
import akka.util.ByteString
import spray.http.HttpEntity.Empty
import spray.http.MediaTypes._
import spray.http._
import spray.routing.{HttpService, RequestContext, SimpleRoutingApp}
object StreamingActor {
// helper methods
def fromString(iterable: Iterable[String], ctx: RequestContext): Props = {
fromHttpData(iterable.map(HttpData.apply), ctx)
}
def fromStringAndCharset(iterable: Iterable[String], ctx: RequestContext, charset: HttpCharset): Props = {
fromHttpData(iterable.map(HttpData.apply), ctx)
}
def fromByteArray(iterable: Iterable[Array[Byte]], ctx: RequestContext): Props = {
fromHttpData(iterable.map(HttpData.apply), ctx)
}
def fromByteString(iterable: Iterable[ByteString], ctx: RequestContext): Props = {
fromHttpData(iterable.map(HttpData.apply), ctx)
}
def fromHttpData(iterable: Iterable[HttpData], ctx: RequestContext): Props = {
Props(new StreamingActor(iterable, ctx))
}
// initial message sent by StreamingActor to itself
private case object FirstChunk
// confirmation that given chunk was sent to client
private case object ChunkAck
}
class StreamingActor(chunks: Iterable[HttpData], ctx: RequestContext) extends Actor with HttpService with ActorLogging {
import StreamingActor._
def actorRefFactory = context
val chunkIterator: Iterator[HttpData] = chunks.iterator
self ! FirstChunk
def receive = {
// send first chunk to client
case FirstChunk if chunkIterator.hasNext =>
val responseStart = HttpResponse(entity = HttpEntity(`text/html`, chunkIterator.next()))
ctx.responder ! ChunkedResponseStart(responseStart).withAck(ChunkAck)
// data stream is empty. Respond with Content-Length: 0 and stop
case FirstChunk =>
ctx.responder ! HttpResponse(entity = Empty)
context.stop(self)
// send next chunk to client
case ChunkAck if chunkIterator.hasNext =>
val nextChunk = MessageChunk(chunkIterator.next())
ctx.responder ! nextChunk.withAck(ChunkAck)
// all chunks were sent. stop.
case ChunkAck =>
ctx.responder ! ChunkedMessageEnd
context.stop(self)
//
case x => unhandled(x)
}
}