7

何らかの理由で、これを実装することに頭を悩ませることはできません。Elastic Searchを呼び出すアプリケーションをPlayで実行しています。私の設計の一環として、私のサービスは、このブログ投稿に示されているように、scala future でラップされた Java API を使用しています。その投稿のコードを更新して、次のようにブロッキング I/O を実行することを ExecutionContext に示唆しました。

    import scala.concurent.{blocking, Future, Promise}
    import org.elasticsearch.action.{ActionRequestBuilder, ActionListener, ActionResponse }
    def execute[RB <: ActionRequestBuilder[_, T, _, _]](request: RB): Future[T] = {
        blocking {
            request.execute(this)
            promise.future
        }
    }

ES に送信するクエリを作成する実際のサービスは、コンストラクター パラメーターとして executionContext を受け取り、エラスティック サーチの呼び出しに使用します。これを行ったのは、play が使用するグローバル実行コンテキストが、ES へのブロッキング呼び出しによってスレッドが拘束されないようにするためです。この SO コメントは、グローバル コンテキストのみがブロックを認識していることに言及しているため、独自のコンテキストを作成する必要があります。その同じ投稿/回答には、ForkJoin プールの使用に関する多くの情報がありますが、それらのドキュメントに書かれている内容を取得し、ブロッキング ドキュメントのヒントと組み合わせて、ブロッキングに応答する実行コンテキストを作成する方法がわかりません。ヒント。

私が抱えている問題の 1 つは、そもそもブロッキング コンテキストにどのように対応すればよいかわからないことだと思います。私はベストプラクティスを読んでいましたが、それが使用する例はスレッドの無制限のキャッシュです:

ここでは、無制限の「キャッシュされたスレッドプール」を使用することを好むため、制限がないことに注意してください。ブロッキング I/O を行う場合、ブロックできる十分な数のスレッドが必要であるという考えがあります。ただし、無制限が多すぎる場合は、ユースケースに応じて、後で微調整できます。このサンプルのアイデアは、ボールを転がすことです。

これは、ForkJoin でサポートされているスレッド プールを使用して、非ブロッキング I/O を処理するときにキャッシュされたスレッドを使用し、IO をブロックするための新しいスレッドを作成する必要があることを意味しますか? または、他の何か?個別のスレッド プールの使用についてオンラインで見つけたほぼすべてのリソースは、Neophytes ガイドが行うことを行う傾向があります。つまり、次のようになります。

さまざまなスレッド プールを調整する方法は、個々のアプリケーションに大きく依存するため、この記事の範囲を超えています。

アプリケーションに依存することはわかっていますが、この場合は、何らかのタイプのブロッキング対応の ExecutionContext を作成し、スレッドを管理するための適切な戦略を理解したい場合です。コンテキストがアプリケーションの特定の部分用である場合、固定スレッド プール サイズを作成blockingし、そもそもキーワードを使用/無視しないようにする必要がありますか?

私はとりとめのない傾向があるので、回答で探しているものを分解しようとします。

  1. コード!これらのドキュメントをすべて読んでも、ブロッキングを意識したコンテキストをコーディングすることはできないと感じているので、例を示していただければ幸いです。
  2. ブロッキングスレッドを処理する方法に関するリンクまたはヒント。つまり、それらのために新しいスレッドを無限に作成し、使用可能なスレッドの数を確認し、多すぎる場合は拒否します。他の戦略
  3. ここでパフォーマンスのヒントを探しているわけではありません。テストでしか得られないことはわかっていますが、そもそもコンテキストをコーディングする方法がわからない場合はテストできません! ForkJoins と threadpools の例を見つけましたが、重要な部分が欠けていblockingます。

ここで長い質問をして申し訳ありません。私が見ているものの感覚をあなたに伝えようとしているだけで、1 日以上この問題に頭を悩ませようとしていて、外部の助けが必要です。


編集:これを明確にするために、ElasticSearch サービスのコンストラクターの署名は次のとおりです。

//Note that these are not implicit parameters!
class ElasticSearchService(otherParams ..., val executionContext: ExecutionContext)

そして、私のアプリケーションの起動コードには、次のようなものがあります。

object Global extends GlobalSettings {
    val elasticSearchContext = //Custom Context goes here
    ...
    val elasticSearchService = new ElasticSearchService(params, elasticSearchContext);
    ...
}

私はPlay の contexts に関する推奨事項も読んでいますが、ヒントのブロックについてはまだ何も見ていませんBlockContext

4

1 に答える 1

3

だから私はドキュメンテーションを掘り下げ、私が扱っている状況に対するPlayのベストプラクティスは

特定の状況では、作業を他のスレッド プールにディスパッチしたい場合があります。これには、CPU 負荷の高い作業や、データベース アクセスなどの IO 作業が含まれる場合があります。これを行うには、まずスレッド プールを作成する必要があります。これは Scala で簡単に行うことができます。

そしていくつかのコードを提供します:

object Contexts {
    implicit val myExecutionContext: ExecutionContext = Akka.system.dispatchers.lookup("my-context")
}

コンテキストは Akka のものなので、そこに行き、提供されているコンテキストのデフォルトとタイプを探しました。最終的に、ディスパッチャに関するドキュメントにたどり着きました。デフォルトはForkJoinPoolで、ブロックを管理するためのデフォルトのメソッドはmanagedBlock(blocker). これにより、次のようなドキュメントを読むことになりました。

指定されたブロッカーに従ってブロックします。現在のスレッドが ForkJoinWorkerThread の場合、このメソッドは、現在のスレッドがブロックされている間に十分な並列性を確保するために、必要に応じて予備のスレッドをアクティブにするように手配します。

だから、もし私がForkJoinWorkerThreadそれを持っていれば、私が望むと思う行動が起こるようです. ForkJoinPool のソースをさらに見てみると、デフォルトのスレッド ファクトリは次のようになっていることに気付きました。

val defaultForkJoinWorkerThreadFactory: ForkJoinWorkerThreadFactory = juc.ForkJoinPool.defaultForkJoinWorkerThreadFactory

これは、Akka でデフォルトを使用すると、期待どおりにブロッキングを処理するコンテキストが得られることを意味します。

したがって、Akka のドキュメントをもう一度読むと、コンテキストを次のように指定しているように見えます。

my-context {
  type = Dispatcher
  executor = "fork-join-executor"
  fork-join-executor {
    parallelism-min = 8
    parallelism-factor = 3.0
    parallelism-max = 64
    task-peeking-mode = "FIFO"
  }
  throughput = 100
}

私が欲しいものでしょう。

blockingソースコードを検索しているときに、呼び出しの使用または使用を探して、 ThreadPoolBuildermanagedBlockで ForkJoin の動作をオーバーライドする例を見つけました。

private[akka] class AkkaForkJoinWorkerThread(_pool: ForkJoinPool) extends ForkJoinWorkerThread(_pool) with BlockContext {
    override def blockOn[T](thunk: ⇒ T)(implicit permission: CanAwait): T = {
      val result = new AtomicReference[Option[T]](None)
      ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
        def block(): Boolean = {
          result.set(Some(thunk))
          true
        }
        def isReleasable = result.get.isDefined
      })
      result.get.get // Exception intended if None
    }
  }

これは、BlockContext を実装するものを作成する方法の例として、私が最初に求めたもののようですそのファイルには、ExecutorServiceFactory を作成する方法を示すコードも含まれています。これexecutorは、構成の一部によって参照されていると私が信じているものです。したがって、完全にカスタムのコンテキストが必要な場合は、何らかのタイプの WorkerThread を拡張し、カスタム ワーカースレッドを使用する独自の ExecutorServiceFactory を作成し、この投稿のアドバイスのようにプロパティで完全修飾クラス名を指定することになると思います。

私はおそらく Akka の forkjoin を使用するつもりです :)

于 2016-01-28T17:34:37.370 に答える