0

私はSprayを初めて使用し、サーバーからデータのストリームを受信するクライアントを実装しようとしています. 私の現在のコードは次のようになります。クライアントは HTTP サーバーにリクエストを送信し、HTTP サーバーはデータのストリーム (チャンクされたレスポンスとして) を返します。サーバーに接続し、レスポンスを取得できることを確認しました。

ただし、切断と再接続をどのように処理する必要があるかは明確ではありません。たとえば、(A) ネットワーク接続が失われた場合、または (B) その時点でサーバーに送信するデータがないためにクライアントがタイムアウトした場合などです。ポインタ/例をいただければ幸いです。

アップデート

まず、上記の (A) と (B) のイベントを検出します。クライアントが上記の (A) または (B) のいずれかを経験した場合、接続を再確立し、再認証して続行できるようにする必要があります (connected状態に戻ってデータ ストリームを取得します。

import spray.http._
import spray.client.pipelining._
import akka.actor._
import spray.can.Http
import akka.io.IO
import spray.http.HttpRequest
import spray.http.ChunkedResponseStart
import spray.http.HttpRequest
import spray.http.ChunkedResponseStart

trait Authorization {
  def authorize: HttpRequest => HttpRequest
}

trait OAuthAuthorization extends Authorization {

  import OAuth._

  val consumer = ???
  val token = ???
  val authorize: (HttpRequest) => HttpRequest = OAuthorizer(token, token)
}


class StreamerActor(uri: Uri) extends Actor with ActorLogging {
  this: Authorization =>
  val io = IO(Http)(context.system)

  //Initial state of the Actor
  def receive = ready

  def ready: Receive = {
    case query: String =>
      val body = HttpEntity(ContentType(MediaTypes.`application/x-www-form-urlencoded`), s"$query")
      val req = HttpRequest(HttpMethods.POST, uri = uri, entity = body) ~> authorize
      sendTo(io).withResponsesReceivedBy(self)(req)
      //As soon as you get the data you should change state to "connected" by using a "become"
      context become connected
  }

  def connected: Receive = {
    case ChunkedResponseStart(_) => log.info("Chunked Response started.")
    case MessageChunk(entity, _) => log.info(entity.asString)
    case ChunkedMessageEnd(_, _) => log.info("Chunked Message Ended")
    case Http.Closed => log.info("HTTP closed")
    case _ =>
  }
}

object SprayStreamer extends App {

  val system = ActorSystem("simple-spray-http")
  val Uri = Uri("https://.....")
  val streamClient = system.actorOf(Props(new StreamerActor(Uri) with OAuthAuthorization), name = "spray-client")
  streamClient ! "keyword"

}

これらは私の内容ですresources/application.conf

spray {
  can.server {
    idle-timeout = 90 s
    request-timeout = 80 s
    connection-timeout = 90 s
    reqiest-chunk-aggregation-limit = 0
  }

  can.client {
    idle-timeout = 90 s
    request-timeout = 80 s
    connection-timeout = 90 s
    response-chunk-aggregation-limit = 0
  }

  io.confirm-sends = on

}
4

0 に答える 0