ThreadPoolExecutor
最大サイズに達してキューがいっぱいになると、新しいタスクを追加しようとするとsubmit()
メソッドがブロックされるようなものを作成したいと考えています。そのためのカスタムを実装する必要RejectedExecutionHandler
がありますか、それとも標準 Java ライブラリを使用してこれを行う既存の方法はありますか?
17 に答える
私が今見つけた可能な解決策の1つ:
public class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;
public BoundedExecutor(Executor exec, int bound) {
this.exec = exec;
this.semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command)
throws InterruptedException, RejectedExecutionException {
semaphore.acquire();
try {
exec.execute(new Runnable() {
public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
throw e;
}
}
}
他に解決策はありますか?RejectedExecutionHandler
そのような状況を処理するための標準的な方法のように思われるので、私はに基づいたものを好みます。
ThreadPoolExecutor とブロッキングキューを使用できます。
public class ImageManager {
BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(blockQueueSize);
RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
private ExecutorService executorService = new ThreadPoolExecutor(numOfThread, numOfThread,
0L, TimeUnit.MILLISECONDS, blockingQueue, rejectedExecutionHandler);
private int downloadThumbnail(String fileListPath){
executorService.submit(new yourRunnable());
}
}
CallerRunsPolicy
呼び出し元のスレッドで拒否されたタスクを実行する、を使用する必要があります。このように、タスクが完了するまで、新しいタスクをエグゼキュータに送信することはできません。その時点で、いくつかの空きプールスレッドが存在するか、プロセスが繰り返されます。
ドキュメントから:
拒否されたタスク
メソッドexecute(java.lang.Runnable)で送信された新しいタスクは、Executorがシャットダウンされたとき、およびExecutorが最大スレッドとワークキュー容量の両方に有限の境界を使用し、飽和状態になったときに拒否されます。いずれの場合も、executeメソッドは、RejectedExecutionHandlerのRejectedExecutionHandler.rejectedExecution(java.lang.Runnable、java.util.concurrent.ThreadPoolExecutor)メソッドを呼び出します。4つの事前定義されたハンドラーポリシーが提供されます。
- デフォルトのThreadPoolExecutor.AbortPolicyでは、ハンドラーは拒否時にランタイムRejectedExecutionExceptionをスローします。
- ThreadPoolExecutor.CallerRunsPolicyでは、execute自体を呼び出すスレッドがタスクを実行します。これにより、新しいタスクが送信される速度を遅くする単純なフィードバック制御メカニズムが提供されます。
- ThreadPoolExecutor.DiscardPolicyでは、実行できないタスクは単にドロップされます。
- ThreadPoolExecutor.DiscardOldestPolicyで、エグゼキュータがシャットダウンされていない場合、ワークキューの先頭にあるタスクがドロップされ、実行が再試行されます(これにより、再度失敗し、これが繰り返される可能性があります)。
また、コンストラクターを呼び出すときは、ArrayBlockingQueueなどの制限付きキューを使用してくださいThreadPoolExecutor
。そうしないと、何も拒否されません。
編集:コメントに応じて、ArrayBlockingQueueのサイズをスレッドプールの最大サイズと等しくなるように設定し、AbortPolicyを使用します。
編集2:わかりました、あなたが何をしているのかわかります。beforeExecute()
これはどうですか:メソッドをオーバーライドして、をgetActiveCount()
超えていないことを確認し、超えgetMaximumPoolSize()
ている場合は、スリープして再試行しますか?
私は知っています、それはハックですが、私の意見では、ここで提供されるものの中で最もクリーンなハックです;-)
ThreadPoolExecutor は「put」ではなくブロッキング キュー「offer」を使用するため、ブロッキング キューの「offer」の動作をオーバーライドします。
class BlockingQueueHack<T> extends ArrayBlockingQueue<T> {
BlockingQueueHack(int size) {
super(size);
}
public boolean offer(T task) {
try {
this.put(task);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return true;
}
}
ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 2, 1, TimeUnit.MINUTES, new BlockingQueueHack(5));
私はそれをテストしましたが、うまくいくようです。いくつかのタイムアウト ポリシーの実装は、読者の課題として残されています。
Hibernate には、BlockPolicy
単純で、必要なことができる があります。
参照: Executors.java
/**
* A handler for rejected tasks that will have the caller block until
* space is available.
*/
public static class BlockPolicy implements RejectedExecutionHandler {
/**
* Creates a <tt>BlockPolicy</tt>.
*/
public BlockPolicy() { }
/**
* Puts the Runnable to the blocking queue, effectively blocking
* the delegating thread until space is available.
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
try {
e.getQueue().put( r );
}
catch (InterruptedException e1) {
log.error( "Work discarded, thread was interrupted while waiting for space to schedule: {}", r );
}
}
}
上記のJava Concurrency in PracticeBoundedExecutor
から引用された回答は、Executor に無制限のキューを使用する場合、またはバインドされたセマフォがキュー サイズを超えない場合にのみ正しく機能します。セマフォは、サブミット スレッドとプール内のスレッド間で共有される状態であり、キュー サイズ < バインド <= (キュー サイズ + プール サイズ) であってもエグゼキュータを飽和させることができます。
使用CallerRunsPolicy
は、タスクが永久に実行されない場合にのみ有効です。この場合、送信スレッドはrejectedExecution
永久に残ります。タスクの実行に時間がかかる場合は、送信スレッドが新しいタスクを送信できないか、または送信できないため、悪い考えです。タスク自体を実行している場合は、他のことを行います。
それが受け入れられない場合は、タスクを送信する前に、executor のバインドされたキューのサイズを確認することをお勧めします。キューがいっぱいの場合は、しばらく待ってから再度送信してください。スループットは低下しますが、提案されている他の多くのソリューションよりも単純なソリューションであり、タスクが拒否されないことが保証されていることをお勧めします。
次のクラスは、ThreadPoolExecutor をラップし、Semaphore を使用してブロックすると、ワーク キューがいっぱいになります。
public final class BlockingExecutor {
private final Executor executor;
private final Semaphore semaphore;
public BlockingExecutor(int queueSize, int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, ThreadFactory factory) {
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
this.executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, queue, factory);
this.semaphore = new Semaphore(queueSize + maxPoolSize);
}
private void execImpl (final Runnable command) throws InterruptedException {
semaphore.acquire();
try {
executor.execute(new Runnable() {
@Override
public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
// will never be thrown with an unbounded buffer (LinkedBlockingQueue)
semaphore.release();
throw e;
}
}
public void execute (Runnable command) throws InterruptedException {
execImpl(command);
}
}
このラッパー クラスは、Brian Goetz 著の『Java Concurrency in Practice』という本に記載されているソリューションに基づいています。Executor
この本の解決策は、セマフォに使用される an と boundの 2 つのコンストラクタ パラメータのみを取ります。これは、Fixpoint の回答に示されています。このアプローチには問題があります。プール スレッドがビジーで、キューがいっぱいであるにもかかわらず、セマフォが許可を解放したばかりの状態になる可能性があります。( semaphore.release()
finally ブロック内)。この状態では、新しいタスクは解放されたばかりの許可を取得できますが、タスク キューがいっぱいであるため拒否されます。もちろん、これはあなたが望むものではありません。この場合はブロックします。
これを解決するには、JCiP が明確に述べているように、無制限のキューを使用する必要があります。セマフォはガードとして機能し、仮想キュー サイズの影響を与えます。maxPoolSize + virtualQueueSize + maxPoolSize
これには、ユニットにタスクを含めることができるという副作用があります。何故ですか?semaphore.release()
finally ブロックにあるためです
。すべてのプール スレッドが同時にこのステートメントを呼び出すと、maxPoolSize
許可が解放され、同じ数のタスクがユニットに入ることができます。バインドされたキューを使用していた場合、まだいっぱいで、タスクが拒否されます。これはプール スレッドがほぼ完了したときにのみ発生することがわかっているため、これは問題ではありません。プール スレッドがブロックされないことがわかっているため、すぐにタスクがキューから取り出されます。
ただし、制限付きキューを使用できます。そのサイズが等しいことを確認してvirtualQueueSize + maxPoolSize
ください。それ以上のサイズは役に立たず、セマフォはそれ以上のアイテムを入れることを防ぎます。サイズが小さいと、タスクが拒否されます。サイズが小さくなると、タスクが拒否される可能性が高くなります。たとえば、maxPoolSize=2 および virtualQueueSize=5 の制限付きエグゼキュータが必要だとします。次に、5+2=7 パーミットと 5+2=7 の実際のキュー サイズを持つセマフォを使用します。ユニットに入れることができるタスクの実際の数は、2 + 5 + 2 = 9 です。エグゼキュータがいっぱいになると (キューに 5 つのタスク、スレッド プールに 2 つ、したがって使用可能なパーミットは 0)、すべてのプール スレッドがパーミットを解放すると、入ってくるタスクはちょうど 2 つのパーミットを取ることができます。
現在、JCiP のソリューションは、これらのすべての制約 (無制限のキュー、またはそれらの数学制限による境界など) を強制するわけではないため、使用するのがやや面倒です。これは、すでに利用可能な部分に基づいて新しいスレッド セーフなクラスを構築する方法を示す良い例にすぎないと思いますが、完全に成長した再利用可能なクラスとしてではありません。後者は作者の意図ではないと思います。
このようにカスタム RejectedExecutionHandler を使用できます
ThreadPoolExecutor tp= new ThreadPoolExecutor(core_size, // core size
max_handlers, // max size
timeout_in_seconds, // idle timeout
TimeUnit.SECONDS, queue, new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// This will block if the queue is full
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
System.err.println(e.getMessage());
}
}
});
最近、この質問に同じ問題があることがわかりました。OP は明示的には言いませんがRejectedExecutionHandler
、サブミッターのスレッドでタスクを実行する を使用したくありません。このタスクが長時間実行されている場合、ワーカー スレッドが十分に活用されないためです。
すべての回答とコメント、特にセマフォまたは使用による欠陥のあるソリューションを読んで、 ThreadPoolExecutorafterExecute
のコードを詳しく調べて、何らかの方法があるかどうかを確認しました。2000 行以上の (コメント付きの) コードがあることに驚きました。中には目まいがするものもあります。私が実際に持っているかなり単純な要件 --- 1 つのプロデューサー、複数のコンシューマー、コンシューマーが作業できないときにプロデューサーをブロックさせる --- を考えると、私は独自のソリューションを展開することにしました。ではなく、単なるです。また、スレッドの数を作業負荷に適応させるのではなく、固定数のスレッドのみを保持するため、これも私の要件に適合します。これがコードです。気軽に怒鳴ってください:-)ExecutorService
Executor
package x;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
/**
* distributes {@code Runnable}s to a fixed number of threads. To keep the
* code lean, this is not an {@code ExecutorService}. In particular there is
* only very simple support to shut this executor down.
*/
public class ParallelExecutor implements Executor {
// other bounded queues work as well and are useful to buffer peak loads
private final BlockingQueue<Runnable> workQueue =
new SynchronousQueue<Runnable>();
private final Thread[] threads;
/*+**********************************************************************/
/**
* creates the requested number of threads and starts them to wait for
* incoming work
*/
public ParallelExecutor(int numThreads) {
this.threads = new Thread[numThreads];
for(int i=0; i<numThreads; i++) {
// could reuse the same Runner all over, but keep it simple
Thread t = new Thread(new Runner());
this.threads[i] = t;
t.start();
}
}
/*+**********************************************************************/
/**
* returns immediately without waiting for the task to be finished, but may
* block if all worker threads are busy.
*
* @throws RejectedExecutionException if we got interrupted while waiting
* for a free worker
*/
@Override
public void execute(Runnable task) {
try {
workQueue.put(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("interrupt while waiting for a free "
+ "worker.", e);
}
}
/*+**********************************************************************/
/**
* Interrupts all workers and joins them. Tasks susceptible to an interrupt
* will preempt their work. Blocks until the last thread surrendered.
*/
public void interruptAndJoinAll() throws InterruptedException {
for(Thread t : threads) {
t.interrupt();
}
for(Thread t : threads) {
t.join();
}
}
/*+**********************************************************************/
private final class Runner implements Runnable {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
Runnable task;
try {
task = workQueue.take();
} catch (InterruptedException e) {
// canonical handling despite exiting right away
Thread.currentThread().interrupt();
return;
}
try {
task.run();
} catch (RuntimeException e) {
// production code to use a logging framework
e.printStackTrace();
}
}
}
}
}
私は最近、似たようなことを達成する必要がありましたが、ScheduledExecutorService
.
また、メソッドに渡される遅延を処理し、呼び出し元が期待する時間にタスクが送信されて実行されるか、単に失敗してRejectedExecutionException
.
ScheduledThreadPoolExecutor
タスクを内部的に実行または送信するための他のメソッドは#schedule
、オーバーライドされたメソッドを引き続き呼び出します。
import java.util.concurrent.*;
public class BlockingScheduler extends ScheduledThreadPoolExecutor {
private final Semaphore maxQueueSize;
public BlockingScheduler(int corePoolSize,
ThreadFactory threadFactory,
int maxQueueSize) {
super(corePoolSize, threadFactory, new AbortPolicy());
this.maxQueueSize = new Semaphore(maxQueueSize);
}
@Override
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
final long newDelayInMs = beforeSchedule(command, unit.toMillis(delay));
return super.schedule(command, newDelayInMs, TimeUnit.MILLISECONDS);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
final long newDelayInMs = beforeSchedule(callable, unit.toMillis(delay));
return super.schedule(callable, newDelayInMs, TimeUnit.MILLISECONDS);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
final long newDelayInMs = beforeSchedule(command, unit.toMillis(initialDelay));
return super.scheduleAtFixedRate(command, newDelayInMs, unit.toMillis(period), TimeUnit.MILLISECONDS);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
final long newDelayInMs = beforeSchedule(command, unit.toMillis(initialDelay));
return super.scheduleWithFixedDelay(command, newDelayInMs, unit.toMillis(period), TimeUnit.MILLISECONDS);
}
@Override
protected void afterExecute(Runnable runnable, Throwable t) {
super.afterExecute(runnable, t);
try {
if (t == null && runnable instanceof Future<?>) {
try {
((Future<?>) runnable).get();
} catch (CancellationException | ExecutionException e) {
t = e;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null) {
System.err.println(t);
}
} finally {
releaseQueueUsage();
}
}
private long beforeSchedule(Runnable runnable, long delay) {
try {
return getQueuePermitAndModifiedDelay(delay);
} catch (InterruptedException e) {
getRejectedExecutionHandler().rejectedExecution(runnable, this);
return 0;
}
}
private long beforeSchedule(Callable callable, long delay) {
try {
return getQueuePermitAndModifiedDelay(delay);
} catch (InterruptedException e) {
getRejectedExecutionHandler().rejectedExecution(new FutureTask(callable), this);
return 0;
}
}
private long getQueuePermitAndModifiedDelay(long delay) throws InterruptedException {
final long beforeAcquireTimeStamp = System.currentTimeMillis();
maxQueueSize.tryAcquire(delay, TimeUnit.MILLISECONDS);
final long afterAcquireTimeStamp = System.currentTimeMillis();
return afterAcquireTimeStamp - beforeAcquireTimeStamp;
}
private void releaseQueueUsage() {
maxQueueSize.release();
}
}
ここにコードがあります。フィードバックをいただければ幸いです。 https://github.com/AmitabhAwasthi/BlockingScheduler
@FixPoint ソリューションの問題を回避するため。ListeningExecutorService を使用して、FutureCallback 内のセマフォ onSuccess および onFailure を解放することができます。
Executor が使用する独自のブロッキング キューを作成し、探しているブロッキング動作で、常に利用可能な残りの容量を返します (Executor がコア プールよりも多くのスレッドを作成しようとしたり、拒否ハンドラーをトリガーしたりしないようにします)。
これにより、探しているブロッキング動作が得られると思います。拒否ハンドラは、エグゼキュータがタスクを実行できないことを示しているため、請求書に適合することはありません。私が想像できるのは、ハンドラーで何らかの形の「ビジー待機」が発生することです。それはあなたが望むものではありません。呼び出し元をブロックするエグゼキュータのキューが必要です...
エラスティック検索クライアントでこの拒否ポリシーを見つけました。ブロッキング キューで呼び出し元スレッドをブロックします。以下のコード-
static class ForceQueuePolicy implements XRejectedExecutionHandler
{
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
{
try
{
executor.getQueue().put(r);
}
catch (InterruptedException e)
{
//should never happen since we never wait
throw new EsRejectedExecutionException(e);
}
}
@Override
public long rejected()
{
return 0;
}
}
以前も同じニーズがありました。共有スレッド プールに支えられた、クライアントごとに固定サイズの一種のブロッキング キューです。私は自分の種類の ThreadPoolExecutor を書くことになりました:
UserThreadPoolExecutor (ブロッキング キュー (クライアントごと) + スレッドプール (すべてのクライアント間で共有))
参照: https://github.com/d4rxh4wx/UserThreadPoolExecutor
各 UserThreadPoolExecutor には、共有 ThreadPoolExecutor から最大数のスレッドが与えられます
各 UserThreadPoolExecutor は次のことができます。
- クォータに達していない場合、タスクを共有スレッド プール エグゼキュータにサブミットします。クォータに達すると、ジョブはキューに入れられます (CPU を待機する非消費ブロック)。送信されたタスクの 1 つが完了すると、クォータが減り、別のタスクが ThreadPoolExecutor に送信されるのを待機できるようになります。
- 残りのタスクが完了するまで待ちます