5

akka-httpチャンクされた応答を返す http サービスに要求を行うために使用しています。関連するコードは次のようになります。

val httpRequest: HttpRequest = //build the request
val request = Http().singleRequest(httpRequest)
request.flatMap { response =>
    response.entity.dataBytes.runForeach { chunk =>
        println("-----")
        println(chunk.utf8String)
    }
}

コマンドラインで生成される出力は次のようになります。

-----
{"data":
-----
"some text"}

-----
{"data":
-----
"this is a longer
-----
text"}

-----
{"data": "txt"}

-----
...

データの論理部分 - この場合の json は行末記号 で終わります\r\nが、問題は、上記の例で明確にわかるように、json が常に単一の http 応答チャンクに収まるとは限らないことです。

私の質問は、受信したチャンク データを完全な json に連結して、結果のコンテナー タイプが残るようにするにはどうすればよいですSource[Out,M1]Flow[In,Out,M2]? の理念を貫きたいと思いakka-streamます。

更新:応答は無限であり、集計はリアルタイムで行う必要があることにも言及する価値があります

4

3 に答える 3

0

akka ストリームのドキュメントには、まさにこの問題に対するクックブックのエントリがあります: 「ByteString のストリームから行を解析する」。彼らの解決策は非常に冗長ですが、単一のチャンクに複数の行が含まれる状況も処理できます。チャンク サイズが複数の json メッセージを処理するのに十分な大きさに変更される可能性があるため、これはより堅牢に見えます。

于 2015-10-28T19:08:02.173 に答える
0
response.entity.dataBytes
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 8096))
.mapAsyncUnordered(Runtime.getRuntime.availableProcessors()) { data =>
  if (response.status == OK) {
    val event: Future[Event] = Unmarshal(data).to[Event]
    event.foreach(x => log.debug("Received event: {}.", x))
    event.map(Right(_))
  } else {
    Future.successful(data.utf8String)
      .map(Left(_))
  }
}

唯一の要件は、1 つのレコードの最大サイズを知っていることです。小さなものから始めた場合、デフォルトの動作は、レコードが制限よりも大きい場合に失敗することです。失敗する代わりに切り捨てるように設定できますが、JSON の一部は意味がありません。

于 2017-12-04T04:08:44.277 に答える