13

ThreadPoolExecutor を使用してタスクをスケジュールしようとしていますが、そのポリシーで問題が発生しています。その動作は次のとおりです。

  1. 実行中のスレッドが corePoolSize よりも少ない場合、Executor は常に、キューに入れるよりも新しいスレッドを追加することを優先します。
  2. corePoolSize 以上のスレッドが実行されている場合、Executor は常に、新しいスレッドを追加するよりも要求をキューに入れることを優先します。
  3. 要求をキューに入れることができない場合、これが maximumPoolSize を超えない限り、新しいスレッドが作成されます。この場合、タスクは拒否されます。

私が望む動作はこれです:

  1. 同上
  2. corePoolSize より多いが maximumPoolSize より少ないスレッドが実行されている場合、キューイングよりも新しいスレッドの追加を優先し、新しいスレッドの追加よりもアイドル スレッドの使用を優先します。
  3. 同上

基本的に、どのタスクも拒否されたくありません。それらを無制限のキューに入れたい。しかし、私は maximumPoolSize スレッドまで持ちたいと思っています。無制限のキューを使用すると、coreSize に達するとスレッドが生成されません。バインドされたキューを使用すると、タスクが拒否されます。これを回避する方法はありますか?

私が今考えているのは、SynchronousQueue で ThreadPoolExecutor を実行することですが、タスクを直接フィードするのではなく、別の無制限の LinkedBlockingQueue にフィードすることです。次に、別のスレッドが LinkedBlockingQueue から Executor にフィードし、拒否された場合は、拒否されなくなるまで再試行します。これは面倒でちょっとしたハックのように思えますが、これを行うためのよりクリーンな方法はありますか?

4

4 に答える 4

4

要求されたスレッド プールを細かく管理する必要はおそらくありません。

キャッシュされたスレッド プールは、潜在的に無制限の同時スレッドを許可しながら、アイドル スレッドを再利用します。これはもちろん、バースト期間中のコンテキスト切り替えのオーバーヘッドにより、パフォーマンスが暴走する可能性があります。

Executors.newCachedThreadPool();

アイドル状態のスレッドが最初に使用されることを保証するという概念を破棄して、スレッドの総数を制限することをお勧めします。構成の変更は次のとおりです。

corePoolSize = maximumPoolSize = N;
allowCoreThreadTimeOut(true);
setKeepAliveTime(aReasonableTimeDuration, TimeUnit.SECONDS);

このシナリオの理由は、エグゼキューターがcorePoolSizeスレッド数よりも少ない場合、非常にビジーであってはならないということです。システムがあまりビジーでない場合は、新しいスレッドをスピンアップしても害はほとんどありません。これを行うと、ThreadPoolExecutor許可されているワーカーの最大数を下回っている場合、常に新しいワーカーが作成されます。最大数のワーカーが「実行中」の場合にのみ、タスクを空しく待機しているワーカーにタスクが与えられます。aReasonableTimeDurationワーカーがタスクなしで待機している場合、ワーカーは終了できます。プール サイズに妥当な制限を適用し (結局のところ、CPU の数は限られています)、タイムアウトを適度に大きくすると (スレッドが不必要に終了しないようにするため)、望ましい効果が得られる可能性があります。

最後のオプションはハックです。基本的に、はキューに容量があるかどうかを判断するためにThreadPoolExecutor内部的に使用します。BlockingQueue.offerのカスタム実装は、試行BlockingQueueを常に拒否する可能性があります。offerがキューへのタスクにThreadPoolExecutor失敗するofferと、新しいワーカーを作成しようとします。新しいワーカーを作成できない場合は、 aRejectedExecutionHandlerが呼び出されます。その時点で、カスタムは aをカスタムにRejectedExecutionHandler強制することができます。putBlockingQueue

/** Hackish BlockingQueue Implementation tightly coupled to ThreadPoolexecutor implementation details. */
class ThreadPoolHackyBlockingQueue<T> implements BlockingQueue<T>, RejectedExecutionHandler {
    BlockingQueue<T> delegate;

    public boolean offer(T item) {
        return false;
    }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        delegate.put(r);
    }

    //.... delegate methods
}
于 2010-08-06T06:01:49.003 に答える
1

あなたのユースケースは一般的で、完全に合法であり、残念ながら予想よりも難しいものです。背景情報については、このディスカッションを読んで、ここで解決策(スレッドでも言及されています)へのポインタを見つけることができます。Shayのソリューションは正常に機能します。

一般的に、私は無制限のキューに少し警戒するでしょう。通常は、正常に機能を低下させ、現在/残りの作業の比率を調整して、生産者または消費者を圧倒しない明示的な流入フロー制御を使用することをお勧めします。

于 2010-08-06T08:01:50.703 に答える
1

corePoolsize = maximumPoolSize無制限のキューを設定して使用するだけですか?

corePoolSizeポイントのリストでは、常に より小さいか等しいため、1 は 2 を除外しmaximumPoolSizeます。

編集

あなたが望むものと TPE が提供するものとの間には、まだ相容れないものがあります。

無制限のキューがある場合maximumPoolSizeは無視されるため、観察したように、これ以上のcorePoolSizeスレッドが作成および使用されることはありません。

繰り返しになりますがcorePoolsize = maximumPoolSize、無制限のキューを使用すると、必要なものが得られますよね?

于 2010-08-05T21:41:04.970 に答える
1

キャッシュされたスレッド プールのようなものをお探しですか?

http://download.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/Executors.html#newCachedThreadPool()

于 2010-08-06T00:08:50.063 に答える