scala akka-http プロジェクトで mongo scala ドライバーの使用を開始しましたが、特に v2.0.0 でのケース クラスのサポートは非常に優れています。observeOnを使用して、デフォルト以外の実行コンテキストで mongo scala ドライバーを使用する方法に頭を悩ませようとしています。
Java ライブラリの依存関係の性質上、ブロッキング呼び出しを使用して MongoDB から結果を取得しています ( Helpersを参照) 。以下のように、observeOn を使用して MongoDBヘルパーの結果と headResult 関数を少し変更しましたが、解決方法がわからない奇妙な競合状態に気付きました。
trait ImplicitObservable[C] {
val observable: Observable[C]
val converter: (C) => String
def headResult()(implicit executionContext: ExecutionContext) = Await.result(observable.observeOn(executionContext).head(), Duration(10, TimeUnit.SECONDS))
def results()(implicit executionContext: ExecutionContext): List[C] = Await.result(observable.observeOn(executionContext).toFuture(), Duration(20, TimeUnit.SECONDS)).toList
}
結果関数は、私が期待しているすべてのレコードを返すわけではなく、1 つのスレッドのみを許可する akka PinnedDispatcherを使用する場合を除いて、動作は毎回異なります。ブロッキング操作であるため、HTTP リクエストをブロックしないように、デフォルト以外の akka ディスパッチャーを使用したいと考えています。誰かがこれで私を助けることができれば、本当に感謝しています。
# looking up dispatcher
val executionContext = system.dispatchers.lookup("mongo-dispatcher")
# application.conf
mongo-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 100
}
throughput = 1
}
私のサンプル データベース クライアント コード:
def getPersonById(personId: String): PersonCaseClass = {
personCollection.find[PersonCaseClass](equal("_id", "person_12345")).first().headResult()
}