問題タブ [blockingqueue]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
java - ThreadPoolExecutor ポリシー
ThreadPoolExecutor を使用してタスクをスケジュールしようとしていますが、そのポリシーで問題が発生しています。その動作は次のとおりです。
- 実行中のスレッドが corePoolSize よりも少ない場合、Executor は常に、キューに入れるよりも新しいスレッドを追加することを優先します。
- corePoolSize 以上のスレッドが実行されている場合、Executor は常に、新しいスレッドを追加するよりも要求をキューに入れることを優先します。
- 要求をキューに入れることができない場合、これが maximumPoolSize を超えない限り、新しいスレッドが作成されます。この場合、タスクは拒否されます。
私が望む動作はこれです:
- 同上
- corePoolSize より多いが maximumPoolSize より少ないスレッドが実行されている場合、キューイングよりも新しいスレッドの追加を優先し、新しいスレッドの追加よりもアイドル スレッドの使用を優先します。
- 同上
基本的に、どのタスクも拒否されたくありません。それらを無制限のキューに入れたい。しかし、私は maximumPoolSize スレッドまで持ちたいと思っています。無制限のキューを使用すると、coreSize に達するとスレッドが生成されません。バインドされたキューを使用すると、タスクが拒否されます。これを回避する方法はありますか?
私が今考えているのは、SynchronousQueue で ThreadPoolExecutor を実行することですが、タスクを直接フィードするのではなく、別の無制限の LinkedBlockingQueue にフィードすることです。次に、別のスレッドが LinkedBlockingQueue から Executor にフィードし、拒否された場合は、拒否されなくなるまで再試行します。これは面倒でちょっとしたハックのように思えますが、これを行うためのよりクリーンな方法はありますか?
java - ThreadPoolExecutor - ArrayBlockingQueue ... キューから要素を削除する前に待機する
次のことを行うスレッドを調整しようとしています。
スレッドが 1 つだけのスレッド プール[CorePoolSize =0, maxPoolSize = 1]
使用されるキューはArrayBlockingQueue です
Quesize = 20
BackGround:
スレッドは、リクエストを読み取って操作を実行しようとします。
しかし、最終的にはリクエストが非常に増加したため、スレッドは常にビジー状態になり、1 つの CPU を消費してリソースを大量に消費します。
私がやりたいことは、代わりにリクエストを一定間隔でサンプリングして処理することです。他のリクエストは安全に無視できます。
私がしなければならないことは、「操作」機能にスリープを入れて、タスクごとにスレッドがしばらくスリープしてCPUを解放することです。
質問:
ただし、次の要素を読み取る前に、基本的にそれ自体がしばらくスリープするキューを使用する方法があるかどうか疑問に思っていました。実行中にタスクをスリープさせ、実行を未完了のままにしておくのは、私には最適とは思えないため、これは理想的です。
他にもお勧めのタスクがあれば教えてください
ありがとう。
編集:ここにフォローアップの質問を追加し 、maxpool サイズを 1 に修正しました [急いで書いた] ..指摘してくれてありがとう tim。
java - 現在のスレッドがスリープ状態の場合、ThreadPoolExecutor は新しいスレッドを生成しますか
この質問は、この質問のフォローアップです。
基本的に私がやっていることは、ThreadPoolExecutor
1 つのスレッドだけで を宣言することです。各タスクが少し遅れて実行されるように、メソッドをオーバーライドしbeforeExecute()
てスリープ状態にします。私のスレッドは一種のスラッシングであるため、これは基本的にCPUを他のスレッドに譲るためです。
したがって、予想される動作は次のとおりです。
の新しいタスクごとにThreadPoolExecutor
、タスクを実行する前に before execute 関数を呼び出すため、タスクを実行する前に約 20 秒間スリープします。
しかし、これは私が見るものです:
送信された新しいタスクごとに:
- タスクを実行します
- beforeExecute メソッドを呼び出します
- 20秒くらい寝ます
- タスクを再実行してください!
1. と 2. の順序は常に同じではありません。
ここに私の質問があります:
- スリープ後/スリープ中に新しいスレッドが入ってきて、実際のスレッドがスリープしている間に先に進み、すぐにタスクを実行しているようです。
ではThreadPoolExecutor
、既存のスレッドがスリープ状態になるとすぐに、[スレッドが終了したと考えて] 新しいスレッドを生成しますか?? 私は keepAliveTime > sleeptime を入れようとしました..上記のアサーションが真の場合..少なくともスリープ時間以上待って新しいスレッドを生成するように...[その間、スリープ状態のスレッドが起動し、新しいスレッドを生成するというThreadPoolExecutor
考えを捨てます - 新しいスレッドを生成してすぐにタスクを実行したとしても、スリープ状態のスレッドが起動した後にタスクが再実行されるのはなぜですか !! その前にタスクキューからタスクを取り出してはいけませんか??
- ここで何か不足していますか?このシナリオをデバッグする他の方法はありますか?
=>目的のタスクを実行するために私が考えていた別の方法[問題を解決しない]は、ランナブルをもう1つのランナブルでラップし、内側のランナブルを呼び出す前に外側のランナブルをスリープさせることでした。
java - JavaのBlockingQueue設計に関する質問
メソッドjava.util.concurrent.BlockingQueue.add(E e)のJavaDocは次のように読み取ります。
boolean add(E e)
容量制限に違反せずにすぐに実行できる場合は、指定された要素をこのキューに挿入します。成功するとtrueを返し、現在使用可能なスペースがない場合はIllegalStateExceptionをスローします。容量制限のあるキューを使用する場合は、通常、offerを使用することをお勧めします。
私の質問は:それはfalseを返すことはありますか?そうでない場合、なぜこのメソッドはブール値を返すのですか?それは私には奇妙に思えます。この背後にある設計上の決定は何ですか?
あなたの知識をありがとう!
マヌエル
java - JMS は永続的なブロッキング キューの必要性に対する答えですか?
イベントをリモート サーバーに非同期的に送信する Log4J アペンダーで構成されるライブラリを作成しています。ログステートメントが作成されると、アペンダーはイベントをローカルキューに非同期に記録し、コンシューマーのプールが取得してリモートに送信します。
完全なインメモリ ソリューションは、同時実行の問題を処理する BlockingQueue を作成することです。ただし、キューを永続化して、リモートサーバーが使用できない場合にキューを無制限に拡張したり、キューが制限されている場合にメッセージを破棄したりしないようにしたいと考えています。
組み込みの H2 データベースを使用してイベントをローカルに保存し、ポーリング メカニズムを使用してイベントを取得し、リモートに送信することを考えていました。データベース テーブルをポーリングするよりも、むしろ BlockingQueue を使用したいと思います。
JMS が答えですか?
編集:
JMS が答えであり、そのように進んでいると思われる場合、処理中のメッセージのみを受け入れるように構成できる、軽量で埋め込み可能な JMS ソリューションに関する推奨事項はありますか? 言い換えれば、リッスンする TCP ソケットを開きたくないし、おそらく許可されないでしょう。
編集:
現在、ActiveMQ が組み込まれており、動作しているようです。皆さんありがとう。
c++ - C ++ pthreadブロッキングキューのデッドロック(私は思う)
デッドロックが発生していると思われるpthreadに問題があります。動作していると思ったブロッキングキューを作成しましたが、さらにテストを行った後、blocking_queueでブロックしている複数のスレッドをキャンセルしようとすると、デッドロックが発生するようです。
ブロッキングキューは非常に単純で、次のようになります。
テストのために、このブロッキングキューをプルする4つのスレッドを作成しました。いくつかのprintステートメントをブロッキングキューに追加しました。各スレッドはpthread_cond_wait()メソッドに到達しています。ただし、各スレッドでpthread_cancel()とpthread_join()を呼び出そうとすると、プログラムがハングします。
また、これを1つのスレッドでテストしたところ、完全に機能します。
ドキュメントによると、pthread_cond_wait()はキャンセルポイントであるため、これらのスレッドでcancelを呼び出すと、実行が停止するはずです(これは、1つのスレッドでのみ機能します)。ただし、pthread_mutex_lockはキャンセルポイントではありません。pthread_cancel()が呼び出され、キャンセルされたスレッドが終了する前にミューテックスを取得してロックを解除し、次のスレッドがキャンセルされたときにミューテックスとデッドロックを取得できないという状況で何かが起こっている可能性がありますか?または、私が間違っていることは他にありますか。
どんなアドバイスも素敵でしょう。ありがとう :)
java - Java: BlockingQueue を使用するプロデューサー/コンシューマー: 別のオブジェクトがキューに入れられるまで、コンシューマー スレッドに wait() を持たせる
最近、ポイントを取るコンシューマでスレッド関連の問題が発生しています。これはオリジナルで、常にキューをチェックして大量の CPU を占有することを除いては正常に動作します。cuePoint をさりげなく呼び出すことができ、メイン スレッドが継続するという考え方です。
cue 関数が呼び出されるたびに notify() を追加し、doFirstPoint() を次のように作り直すことで、CPU の問題を修正しようとしました。
しかし、notify() と wait() は同期関数でのみ機能することがわかりました。doFirstPoint と cuePoint を同期させると、cuePoint を呼び出したメイン スレッドが待機状態になります。
スレッドをオブジェクトにして直接通知するなど、これを回避するためのいくつかのアイデアがありましたが、それが修正されるよりも多くの問題を引き起こすか、非常に悪い形になるか、単に機能しないかはわかりませんでした. 私が見逃しているこの問題の簡単な解決策はありますか?
java - Socketから行を読み取り、それぞれをBlockingQueueに入れます
誰かがJavaで例を提供したり、ソケットから行を非同期的に読み取り、各行をBlockingQueueに入れるクラスの実装についてアドバイスしたりできますか。ソケットが接続されており、BlockingQueueとコンシューマーがすでに存在していると仮定します。
編集:もう1つ、非アクティブな状態が一定時間続くとタイムアウトし、コマンドですぐに停止する機能が必要です。
それは宿題ではありません、私は単にこれをうまくそして確実に行う方法の完全な例を見つけることができませんでした。
どうもありがとうございます。
java - ブロッキングキューを「閉じる」
非常に単純な生産者/消費者シナリオでjava.util.concurrent.BlockingQueueを使用しています。たとえば、この擬似コードは消費者の部分を表しています。
ここまでは順調ですね。ブロッキングキューのjavadocで、次のように読みました。
BlockingQueueは、アイテムが追加されないことを示すための「閉じる」または「シャットダウン」操作を本質的にサポートしていません。このような機能のニーズと使用法は、実装に依存する傾向があります。たとえば、一般的な戦術は、プロデューサーが特別なエンドオブストリームまたはポイズンオブジェクトを挿入することです。これらは、コンシューマーによって取得されたときにそれに応じて解釈されます。
残念ながら、使用されているジェネリックとComplexObjectの性質により、「ポイズンオブジェクト」をキューにプッシュするのは簡単ではありません。したがって、この「一般的な戦術」は、私のシナリオではあまり便利ではありません。
私の質問は、キューを「閉じる」ために他にどのような優れた戦術/パターンを使用できるかということです。
ありがとうございました!
java - ブロッキング優先度キューに保存されているカスタム オブジェクトの更新
タイプ Message のオブジェクトを格納するブロッキング プライオリティ キューがあります。メッセージには String[] data = new String[10] があります。ここで、ブロッキング キュー全体を反復する必要があります。そのオブジェクト メッセージの 2 番目の要素が着信メッセージの 6 番目の要素と等しいかどうかを確認します。
Messages のコンパレータは、更新が必要な 6 番目の要素に基づいていません。問題は、オブジェクトを取り出して同じ位置に配置する方法と、以下のコードを使用して更新すると、 iter.next() が実行されるたびに次のオブジェクトを指し始める可能性があることです。
これが私が試みていることです。