4

scala akka-http プロジェクトで mongo scala ドライバーの使用を開始しましたが、特に v2.0.0 でのケース クラスのサポートは非​​常に優れています。observeOnを使用して、デフォルト以外の実行コンテキストで mongo scala ドライバーを使用する方法に頭を悩ませようとしています。

Java ライブラリの依存関係の性質上、ブロッキング呼び出しを使用して MongoDB から結果を取得しています ( Helpersを参照) 。以下のように、observeOn を使用して MongoDBヘルパーの結果と headResult 関数を少し変更しましたが、解決方法がわからない奇妙な競合状態に気付きました。

  trait ImplicitObservable[C] {
    val observable: Observable[C]
    val converter: (C) => String

    def headResult()(implicit executionContext: ExecutionContext) = Await.result(observable.observeOn(executionContext).head(), Duration(10, TimeUnit.SECONDS))

    def results()(implicit executionContext: ExecutionContext): List[C] = Await.result(observable.observeOn(executionContext).toFuture(), Duration(20, TimeUnit.SECONDS)).toList
  }

結果関数は、私が期待しているすべてのレコードを返すわけではなく、1 つのスレッドのみを許可する akka PinnedDispatcherを使用する場合を除いて、動作は毎回異なります。ブロッキング操作であるため、HTTP リクエストをブロックしないように、デフォルト以外の akka ディスパッチャーを使用したいと考えています。誰かがこれで私を助けることができれば、本当に感謝しています。

  # looking up dispatcher 
  val executionContext = system.dispatchers.lookup("mongo-dispatcher")

  # application.conf
  mongo-dispatcher {
    type = Dispatcher
    executor = "thread-pool-executor"
    thread-pool-executor {
      fixed-pool-size = 100
    }
    throughput = 1
  }

私のサンプル データベース クライアント コード:

def getPersonById(personId: String): PersonCaseClass = {
    personCollection.find[PersonCaseClass](equal("_id", "person_12345")).first().headResult()
}
4

0 に答える 0