38

Akka HTTP を使用してリクエストを基本認証しようとしています。たまたま、認証する外部リソースがあるため、このリソースに対して残りの呼び出しを行う必要があります。

これには時間がかかります。処理中は、API の残りの部分がブロックされ、この呼び出しを待っているようです。これを非常に簡単な例で再現しました。

// used dispatcher:
implicit val system = ActorSystem()
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()


val routes = 
  (post & entity(as[String])) { e =>
    complete {
      Future{
        Thread.sleep(5000)
        e
      }
    }
  } ~
  (get & path(Segment)) { r =>
    complete {
      "get"
    }
  }

ログ エンドポイントに投稿すると、ログ エンドポイントが指示した 5 秒間待機して、get エンドポイントもスタックします。

これは予想される動作ですか? また、そうである場合、API 全体をブロックせずにブロック操作を行うにはどうすればよいですか?

4

2 に答える 2

134

あなたが観察したのは、予期された動作ですが、もちろん非常に悪いものです。それを防ぐための既知のソリューションとベストプラクティスが存在することは良いことです。この回答では、問題を短く、長く、そして詳細に説明するために時間を費やしたいと思います - 読んで楽しんでください!

簡単な答え:「ルーティング インフラストラクチャをブロックしないでください!」、ブロック操作には常に専用のディスパッチャを使用してください!

観察された症状の原因:問題はcontext.dispatcher、ブロック先物が実行されるディスパッチャとして使用していることです。同じディスパッチャー (簡単に言えば、単なる「スレッドの束」) がルーティング インフラストラクチャによって使用され、着信要求を実際に処理します。したがって、使用可能なすべてのスレッドをブロックすると、ルーティング インフラストラクチャが枯渇することになります。(議論とベンチマークの課題は、Akka HTTP がこれを防ぐことができるかどうかです。それを私の研究 todo リストに追加します)。

ブロッキングは、同じディスパッチャーの他のユーザーに影響を与えないように、特別な注意を払って処理する必要があります (これが、Akka のドキュメント セクションで説明されているように、実行を別のユーザーに簡単に分離できるようにするためです):ブロッキングには慎重な管理が必要です。

ここで注意したいのは、可能な限り API をブロックすることは避けるべきだということです。長時間実行される操作が実際には 1 つの操作ではなく一連の操作である場合は、それらを別のアクターまたはシーケンスされた先物に分離することができます。とにかく、指摘したかっただけです - 可能であれば、そのようなブロッキングコールを避けてください.

詳細な分析とソリューション:

概念的に何が間違っているかがわかったので、上記のコードで正確に何が壊れているのか、そしてこの問題の正しい解決策がどのように見えるかを見てみましょう。

色=糸の状態:

  • ターコイズ – 眠っている
  • オレンジ - 待機中
  • 緑 - 実行可能

それでは、3 つのコードと、ディスパッチャーへの影響とアプリのパフォーマンスを調べてみましょう。この動作を強制するために、アプリには次の負荷がかかります。

  • [a] GET リクエストをリクエストし続けます (最初の質問の上記のコードを参照してください)。そこではブロックされません。
  • [b] その後、しばらくして 2000 の POST リクエストを発行します。これにより、未来を返す前に 5 秒間のブロックが発生します。

1) [bad]不正なコードに対する Dispatcher の動作:

// BAD! (due to the blocking in Future):
implicit val defaultDispatcher = system.dispatcher

val routes: Route = post { 
  complete {
    Future { // uses defaultDispatcher
      Thread.sleep(5000)                    // will block on the default dispatcher,
      System.currentTimeMillis().toString   // starving the routing infra
    }
  }
}

したがって、アプリを [a] 負荷に公開すると、多数の akka.actor.default-dispatcher スレッドが既に表示されていることがわかります。これらはリクエストを処理しています。小さな緑色のスニペットとオレンジ色は、他のスレッドが実際にそこでアイドル状態であることを意味します。

ブロッキングはデフォルトのディスパッチャを強制終了しています

次に、[b] ロードを開始します。これにより、これらのスレッドがブロックされます。初期のスレッド「default-dispatcher-2,3,4」が、以前にアイドル状態だった後にブロックされていることがわかります。また、プールが拡大することも観察します。新しいスレッドは「default-dispatcher-18,19,20,21...」で開始されますが、すぐにスリープ状態になります (!) - ここで貴重なリソースを浪費しています!

このように開始されたスレッドの数は、デフォルトのディスパッチャ構成によって異なりますが、50 程度を超えることはありません。2k のブロッキング op ​​を起動したばかりなので、スレッドプール全体を枯渇させます。ルーティング インフラストラクチャが他のリクエストを処理するために使用できるスレッドを持たないように、ブロッキング オペレーションが支配的です。非常に悪いことです。

それについて何かしましょう (これは、偶然にも Akka のベスト プラクティスです。以下に示すように、常にブロック動作を分離してください)。

2) [good!]ディスパッチャーの動作 適切な構造化コード/ディスパッチャー:

application.confブロック動作専用のこのディスパッチャーを設定します。

my-blocking-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    // in Akka previous to 2.4.2:
    core-pool-size-min = 16
    core-pool-size-max = 16
    max-pool-size-min = 16
    max-pool-size-max = 16
    // or in Akka 2.4.2+
    fixed-pool-size = 16
  }
  throughput = 100
}

ここでのさまざまなオプションを理解するには、 Akka Dispatchersのドキュメントを読んでください。ただし、重要な点はThreadPoolExecutor、ブロッキング ops に使用できるスレッドのハード リミットを持つ を選択したことです。サイズの設定は、アプリの機能とサーバーのコア数によって異なります。

次に、デフォルトの代わりにそれを使用する必要があります。

// GOOD (due to the blocking in Future):
implicit val blockingDispatcher = system.dispatchers.lookup("my-blocking-dispatcher")

val routes: Route = post { 
  complete {
    Future { // uses the good "blocking dispatcher" that we configured, 
             // instead of the default dispatcher – the blocking is isolated.
      Thread.sleep(5000)
      System.currentTimeMillis().toString
    }
  }
}

同じ負荷を使用してアプリに圧力をかけます。最初に少し通常のリクエストを行い、次にブロックするリクエストを追加します。この場合、ThreadPools は次のように動作します。

ブロッキングプールは私たちのニーズに合わせてスケーリングします

したがって、最初は通常のリクエストはデフォルトのディスパッチャーによって簡単に処理されます。そこにいくつかの緑色の線が表示されます。これが実際の実行です (実際にはサーバーに重い負荷をかけているわけではないので、ほとんどアイドル状態です)。

ブロッキング ops の発行を開始するmy-blocking-dispatcher-*と、構成されたスレッドの数まで開始されます。そこにあるすべての Sleeping を処理します。また、これらのスレッドで一定期間何も起こらないと、スレッドをシャットダウンします。別のブロックでサーバーにヒットした場合、プールは新しいスレッドを開始し、sleep() を処理しますが、その間、貴重なスレッドを無駄にすることはありません。何もしない"。

このセットアップを使用すると、通常の GET リクエストのスループットは影響を受けず、(まだかなり無料の) デフォルトのディスパッチャで問題なく処理されました。

これは、リアクティブ アプリケーションであらゆる種類のブロッキングに対処するための推奨される方法です。多くの場合、アプリの動作の悪い部分を「隔壁化」(または「分離」) すると呼ばれます。この場合、悪い動作はスリープ/ブロックです。

3) [workaround-ish]blocking適切に適用された場合の Dispatcher の動作:

この例では、ブロッキング ops に直面したときに役立つscaladoc forメソッドを使用します。scala.concurrent.blocking通常、ブロック操作を生き残るために、より多くのスレッドがスピンアップされます。

// OK, default dispatcher but we'll use `blocking`
implicit val dispatcher = system.dispatcher

val routes: Route = post { 
  complete {
    Future { // uses the default dispatcher (it's a Fork-Join Pool)
      blocking { // will cause much more threads to be spun-up, avoiding starvation somewhat, 
                 // but at the cost of exploding the number of threads (which eventually
                 // may also lead to starvation problems, but on a different layer)
        Thread.sleep(5000)
        System.currentTimeMillis().toString
       }
    }
  }
}

アプリは次のように動作します。

ブロックすると、より多くのスレッドが開始されます

多くの新しいスレッドが作成されていることに気付くでしょう。これは、「ああ、これはブロックされるので、さらにスレッドが必要です」というブロッキングのヒントがあるためです。これにより、ブロックされている合計時間は 1) の例よりも短くなりますが、ブロック操作が終了した後、何百ものスレッドが何もしていません...確かに、それらは最終的にシャットダウンされます (FJP はこれを行います)。 )、しかししばらくの間、大量の(制御されていない)スレッドが実行されますが、2)のソリューションとは対照的に、ブロック動作専用のスレッド数が正確にわかっています。

まとめ: デフォルトのディスパッチャーを決してブロックしないでください :-)

ベスト プラクティスは、 に示すパターンを使用し2)て、ブロック操作用のディスパッチャを使用可能にし、そこで実行することです。

議論された Akka HTTP バージョン:2.0.1

使用したプロファイラー:多くの人が、この回答に対して個人的に、上の写真でスレッドの状態を視覚化するために使用したプロファイラーを尋ねてきたので、ここにこの情報を追加します: 私は素晴らしい商用プロファイラー (OSS では無料) であるYourKitを使用しましたがOpenJDK の無料の VisualVMを使用して同じ結果を得ることができます。

于 2016-01-06T23:56:13.127 に答える
3

奇妙ですが、私にとってはすべて正常に動作します (ブロックなし)。コードは次のとおりです。

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer

import scala.concurrent.Future


object Main {

  implicit val system = ActorSystem()
  implicit val executor = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val routes: Route = (post & entity(as[String])) { e =>
    complete {
      Future {
        Thread.sleep(5000)
        e
      }
    }
  } ~
    (get & path(Segment)) { r =>
      complete {
        "get"
      }
    }

  def main(args: Array[String]) {

    Http().bindAndHandle(routes, "0.0.0.0", 9000).onFailure {
      case e =>
        system.shutdown()
    }
  }
}

onCompleteまた、非同期コードをoronSuccessディレクティブにラップすることもできます:

onComplete(Future{Thread.sleep(5000)}){e} 

onSuccess(Future{Thread.sleep(5000)}){complete(e)}
于 2016-01-06T20:51:21.963 に答える