すべての提案をありがとう!
結局、私はかなり単純であると私が信じる何かを選びました。CountDownLatchがほとんど必要なものであることがわかりました。カウンターが0に達するまでブロックします。唯一の問題は、カウントダウンのみが可能で、アップはできないため、タスクが新しいタスクを送信できる動的設定では機能しないことです。したがって、CountLatch
追加機能を提供する新しいクラスを実装しました。(以下を参照)次に、このクラスを次のように使用します。
メインスレッドが呼び出しlatch.awaitZero()
、ラッチが0に達するまでブロックします。
executor.execute(..)
呼び出しを呼び出す前の任意のスレッドlatch.increment()
。
完了する直前のすべてのタスクは、を呼び出しますlatch.decrement()
。
最後のタスクが終了すると、カウンターは0に到達し、メインスレッドを解放します。
さらなる提案やフィードバックは大歓迎です!
public class CountLatch {
@SuppressWarnings("serial")
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected int acquireNonBlocking(int acquires) {
// increment count
for (;;) {
int c = getState();
int nextc = c + 1;
if (compareAndSetState(c, nextc))
return 1;
}
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
public CountLatch(int count) {
this.sync = new Sync(count);
}
public void awaitZero() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean awaitZero(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void increment() {
sync.acquireNonBlocking(1);
}
public void decrement() {
sync.releaseShared(1);
}
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}
increment()
/呼び出しは、たとえばSami Korhonenによって提案されたように、またはimplによって提案されたように、カスタマイズされたサブクラスにカプセルdecrement()
化できることに注意してください。ここを参照してください:Executor
beforeExecute
afterExecute
public class CountingThreadPoolExecutor extends ThreadPoolExecutor {
protected final CountLatch numRunningTasks = new CountLatch(0);
public CountingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
public void execute(Runnable command) {
numRunningTasks.increment();
super.execute(command);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
numRunningTasks.decrement();
super.afterExecute(r, t);
}
/**
* Awaits the completion of all spawned tasks.
*/
public void awaitCompletion() throws InterruptedException {
numRunningTasks.awaitZero();
}
/**
* Awaits the completion of all spawned tasks.
*/
public void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException {
numRunningTasks.awaitZero(timeout, unit);
}
}