6

akka 2.3.6でspray 1.3.2を使用。(akka はスプレーのみに使用されます)。
巨大なファイルを読み取り、各行に対して http リクエストを作成する必要があります。
イテレータを使用してファイルを1行ずつ読み取り、アイテムごとにリクエストを行います。一部の行では正常に実行されますが、ある時点で失敗し始めます:
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://default/user/IO-HTTP#-35162984]] after [60000 ms].
最初はサービスをオーバーロードしていると思ったので、「spray.can.host-connector.max-connections」を 1 に設定しました。実行速度はかなり遅くなりましたが、同じエラーが発生しました。

ここにコード:

import spray.http.MediaTypes._
val EdnType = register(
MediaType.custom(
  mainType = "application",
  subType = "edn",
  compressible = true,
  binary = false,
  fileExtensions = Seq("edn")))

val pipeline = (
  addHeader("Accept", "application/json")
  ~> sendReceive
  ~> unmarshal[PipelineResponse])

def postData(data: String) = {
  val request = Post(pipelineUrl).withEntity(HttpEntity.apply(EdnType, data))
  val responseFuture: Future[PipelineResponse] = pipeline(request)
  responseFuture
}

dataLines.map { d =>
  val f = postData(d)
  f.onFailure { case e => println("Error - "+e)} // This is where the errors are display
  f.map { p => someMoreLogic(d, p) }
}

aggrigateResults(dataLines)

データ全体は必要なく、いくつかの集計だけが必要なので、このようにします。

これを解決して完全に非同期に保つにはどうすればよいですか?

4

1 に答える 1

7

Akka の ask タイムアウトは firstCompletedOf を介して実装されるため、ask が初期化されるとタイマーが開始されます。

あなたがしているように見えるのは、(マップ中に)各行の Future を生成しているため、すべての呼び出しがほぼ同時に実行されます。フューチャーが初期化されるとタイムアウトのカウントが開始されますが、スポーンされたすべてのアクターが作業を行うためのエグゼキューター スレッドは残っていません。したがって、ask はタイムアウトします。

「すべてを一度に」処理する代わりに、より柔軟なアプローチをお勧めします - iteratees や akka-streams を使用するのといくらか似ています: Work Pulling Pattern。(ギットハブ)

としてすでに持っているイテレータを提供しますEpicWorker呼び出しといくつかのロジックを実行するアクターを紹介します。N workersその後スポーンするNと、同時に処理される行は最大でも 1 行になります (処理パイプラインには複数のステップが含まれる場合があります)。このようにして、エグゼキューターに過負荷をかけないようにし、タイムアウトが発生しないようにすることができます。

于 2014-11-16T19:19:50.177 に答える