13

私の質問はここでこれと強く関連しています。そこに投稿されたように、メインスレッドは、ワークキューが空になり、すべてのタスクが終了するまで待機する必要があります。ただし、私の状況での問題は、各タスクによって再帰的に新しいタスクが処理のために送信される可能性があることです。これにより、これらのタスクの将来をすべて収集するのが少し厄介になります。

現在のソリューションでは、ビジー待機ループを使用して終了を待機しています。

        do { //Wait until we are done the processing
      try {
        Thread.sleep(200);
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
    } while (!executor.getQueue().isEmpty()
             || numTasks.longValue() > executor.getCompletedTaskCount());

numTasksは、新しいタスクが作成されるたびに増加する値です。これは機能しますが、ビジーウェイトのためあまり良くないと思います。明示的にウェイクアップされるまで、メインスレッドを同期的に待機させる良い方法があるかどうか疑問に思いました。

4

9 に答える 9

6

すべての提案をありがとう!

結局、私はかなり単純であると私が信じる何かを選びました。CountDownLatchがほとんど必要なものであることがわかりました。カウンターが0に達するまでブロックします。唯一の問題は、カウントダウンのみが可能で、アップはできないため、タスクが新しいタスクを送信できる動的設定では機能しないことです。したがって、CountLatch追加機能を提供する新しいクラスを実装しました。(以下を参照)次に、このクラスを次のように使用します。

メインスレッドが呼び出しlatch.awaitZero()、ラッチが0に達するまでブロックします。

executor.execute(..)呼び出しを呼び出す前の任意のスレッドlatch.increment()

完了する直前のすべてのタスクは、を呼び出しますlatch.decrement()

最後のタスクが終了すると、カウンターは0に到達し、メインスレッドを解放します。

さらなる提案やフィードバックは大歓迎です!

public class CountLatch {

@SuppressWarnings("serial")
private static final class Sync extends AbstractQueuedSynchronizer {

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected int acquireNonBlocking(int acquires) {
        // increment count
        for (;;) {
            int c = getState();
            int nextc = c + 1;
            if (compareAndSetState(c, nextc))
                return 1;
        }
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c - 1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

private final Sync sync;

public CountLatch(int count) {
    this.sync = new Sync(count);
}

public void awaitZero() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public boolean awaitZero(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void increment() {
    sync.acquireNonBlocking(1);
}

public void decrement() {
    sync.releaseShared(1);
}

public String toString() {
    return super.toString() + "[Count = " + sync.getCount() + "]";
}

}

increment()/呼び出しは、たとえばSami Korhonenによって提案されたように、またはimplによって提案されたように、カスタマイズされたサブクラスにカプセルdecrement()化できることに注意してください。ここを参照してください:ExecutorbeforeExecuteafterExecute

public class CountingThreadPoolExecutor extends ThreadPoolExecutor {

protected final CountLatch numRunningTasks = new CountLatch(0);

public CountingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
        BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

@Override
public void execute(Runnable command) {
    numRunningTasks.increment();
    super.execute(command);
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
    numRunningTasks.decrement();
    super.afterExecute(r, t);
}

/**
 * Awaits the completion of all spawned tasks.
 */
public void awaitCompletion() throws InterruptedException {
    numRunningTasks.awaitZero();
}

/**
 * Awaits the completion of all spawned tasks.
 */
public void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException {
    numRunningTasks.awaitZero(timeout, unit);
}

}
于 2013-01-26T22:38:23.827 に答える
5

Java 7は、 Phaserと呼ばれるこのユースケースに適合するシンクロナイザーを提供します。これは、CountDownLatchとCyclicBarrierの再利用可能なハイブリッドであり、登録されたパーティの数を増減できます(増分可能なCountDownLatchと同様)。

このシナリオでフェイザーを使用する基本的なパターンは、作成時にフェイザーにタスクを登録し、完了時に到着することです。到着したパーティの数が登録された数と一致すると、フェイザーは次のフェーズに「進み」、待機中のスレッドに進行が発生したときに通知します。

これは、再帰的なタスクの完了を待つために作成した例です。デモンストレーションの目的で、フィボナッチ数列の最初の数個を素朴に見つけます。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicLong;

/**
 * An example of using a Phaser to wait for the completion of recursive tasks.
 * @author Voxelot
 */
public class PhaserExample {
    /** Workstealing threadpool with reduced queue contention. */
    private static ForkJoinPool executors;

    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) throws InterruptedException {
        executors = new ForkJoinPool();
        List<Long> sequence = new ArrayList<>();
        for (int i = 0; i < 20; i++) {
            sequence.add(fib(i));
        }
        System.out.println(sequence);
    }

    /**
     * Computes the nth Fibonacci number in the Fibonacci sequence.
     * @param n The index of the Fibonacci number to compute
     * @return The computed Fibonacci number
     */
    private static Long fib(int n) throws InterruptedException {
        AtomicLong result = new AtomicLong();
        //Flexible sychronization barrier
        Phaser phaser = new Phaser();
        //Base task
        Task initialTask = new Task(n, result, phaser);
        //Register fib(n) calling thread
        phaser.register();
        //Submit base task
        executors.submit(initialTask);
        //Make the calling thread arrive at the synchronization
        //barrier and wait for all future tasks to arrive.
        phaser.arriveAndAwaitAdvance();
        //Get the result of the parallel computation.
        return result.get();
    }

    private static class Task implements Runnable {
        /** The Fibonacci sequence index of this task. */
        private final int index;
        /** The shared result of the computation. */
        private final AtomicLong result;
        /** The synchronizer. */
        private final Phaser phaser;

        public Task(int n, AtomicLong result, Phaser phaser) {
            index = n;
            this.result = result;
            this.phaser = phaser;
            //Inform synchronizer of additional work to complete.
            phaser.register();
        }

        @Override
        public void run() {
            if (index == 1) {
                result.incrementAndGet();
            } else if (index > 1) {
                //recurrence relation: Fn = Fn-1 + Fn-2
                Task task1 = new Task(index - 1, result, phaser);
                Task task2 = new Task(index - 2, result, phaser);
                executors.submit(task1);
                executors.submit(task2);
            }
            //Notify synchronizer of task completion.
            phaser.arrive();
        }
    }
}
于 2014-05-18T23:37:53.643 に答える
4

これは実際には解決すべきかなり興味深い問題でした。コードを完全にテストしていないことを警告する必要があります。

アイデアは、単にタスクの実行を追跡することです。

  • タスクが正常にキューに入れられると、カウンターは1つインクリメントされます
  • タスクがキャンセルされ、実行されていない場合、カウンターは1つデクリメントされます
  • タスクが実行された場合、カウンターは1つデクリメントされます

シャットダウンが呼び出され、保留中のタスクがある場合、デリゲートは実際のExecutorServiceでシャットダウンを呼び出しません。保留中のタスク数がゼロになり、実際のExecutorServiceでシャットダウンが呼び出されるまで、新しいタスクをキューに入れることができます。

public class ResilientExecutorServiceDelegate implements ExecutorService {
    private final ExecutorService executorService;
    private final AtomicInteger pendingTasks;
    private final Lock readLock;
    private final Lock writeLock;
    private boolean isShutdown;

    public ResilientExecutorServiceDelegate(ExecutorService executorService) {
        ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        this.pendingTasks = new AtomicInteger();
        this.readLock = readWriteLock.readLock();
        this.writeLock = readWriteLock.writeLock();
        this.executorService = executorService;
        this.isShutdown = false;
    }

    private <T> T addTask(Callable<T> task) {
        T result;
        boolean success = false;
        // Increment pending tasks counter
        incrementPendingTaskCount();
        try {
            // Call service
            result = task.call();
            success = true;
        } catch (RuntimeException exception) {
            throw exception;
        } catch (Exception exception) {
            throw new RejectedExecutionException(exception);
        } finally {
            if (!success) {
                // Decrement pending tasks counter
                decrementPendingTaskCount();
            }
        }
        return result;
    }

    private void incrementPendingTaskCount() {
        pendingTasks.incrementAndGet();
    }

    private void decrementPendingTaskCount() {
        readLock.lock();
        if (pendingTasks.decrementAndGet() == 0 && isShutdown) {
            try {
                // Shutdown
                executorService.shutdown();
            } catch (Throwable throwable) {
            }
        }
        readLock.unlock();
    }

    @Override
    public void execute(final Runnable task) {
        // Add task
        addTask(new Callable<Object>() {
            @Override
            public Object call() {
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            task.run();
                        } finally {
                            decrementPendingTaskCount();
                        }
                    }
                });
                return null;
            }
        });
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
        // Call service
        return executorService.awaitTermination(timeout, unit);
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
        // It's ok to increment by just one
        incrementPendingTaskCount();
        try {
            return executorService.invokeAll(tasks);
        } finally {
            decrementPendingTaskCount();
        }
    }

    @Override
    public <T> List<Future<T>> invokeAll(
            Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
            throws InterruptedException {
        // It's ok to increment by just one
        incrementPendingTaskCount();
        try {
            return executorService.invokeAll(tasks, timeout, unit);
        } finally {
            decrementPendingTaskCount();
        }
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
        // It's ok to increment by just one
        incrementPendingTaskCount();
        try {
            return executorService.invokeAny(tasks);
        } finally {
            decrementPendingTaskCount();
        }
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
            long timeout, TimeUnit unit) throws InterruptedException,
            ExecutionException, TimeoutException {
        incrementPendingTaskCount();
        try {
            return executorService.invokeAny(tasks, timeout, unit);
        } finally {
            decrementPendingTaskCount();
        }
    }

    @Override
    public boolean isShutdown() {
        return isShutdown;
    }

    @Override
    public boolean isTerminated() {
        return executorService.isTerminated();
    }

    @Override
    public void shutdown() {
        // Lock write lock
        writeLock.lock();
        // Set as shutdown
        isShutdown = true;
        try {
            if (pendingTasks.get() == 0) {
                // Real shutdown
                executorService.shutdown();
            }
        } finally {
            // Unlock write lock
            writeLock.unlock();
        }
    }

    @Override
    public List<Runnable> shutdownNow() {
        // Lock write lock
        writeLock.lock();
        // Set as shutdown
        isShutdown = true;
        // Unlock write lock
        writeLock.unlock();

        return executorService.shutdownNow();
    }

    @Override
    public <T> Future<T> submit(final Callable<T> task) {
        // Create execution status
        final FutureExecutionStatus futureExecutionStatus = new FutureExecutionStatus();
        // Add task
        return addTask(new Callable<Future<T>>() {
            @Override
            public Future<T> call() {
                return new FutureDelegate<T>(
                        executorService.submit(new Callable<T>() {
                            @Override
                            public T call() throws Exception {
                                try {
                                    // Mark as executed
                                    futureExecutionStatus.setExecuted();
                                    // Run the actual task
                                    return task.call();
                                } finally {
                                    decrementPendingTaskCount();
                                }
                            }
                        }), futureExecutionStatus);
            }
        });
    }

    @Override
    public Future<?> submit(final Runnable task) {
        // Create execution status
        final FutureExecutionStatus futureExecutionStatus = new FutureExecutionStatus();
        // Add task
        return addTask(new Callable<Future<?>>() {
            @Override
            @SuppressWarnings("unchecked")
            public Future<?> call() {
                return new FutureDelegate<Object>(
                        (Future<Object>) executorService.submit(new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    // Mark as executed
                                    futureExecutionStatus.setExecuted();
                                    // Run the actual task
                                    task.run();
                                } finally {
                                    decrementPendingTaskCount();
                                }
                            }
                        }), futureExecutionStatus);
            }
        });
    }

    @Override
    public <T> Future<T> submit(final Runnable task, final T result) {
        // Create execution status
        final FutureExecutionStatus futureExecutionStatus = new FutureExecutionStatus();
        // Add task
        return addTask(new Callable<Future<T>>() {
            @Override
            public Future<T> call() {
                return new FutureDelegate<T>(executorService.submit(
                        new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    // Mark as executed
                                    futureExecutionStatus.setExecuted();
                                    // Run the actual task
                                    task.run();
                                } finally {
                                    decrementPendingTaskCount();
                                }
                            }
                        }, result), futureExecutionStatus);
            }
        });
    }

    private class FutureExecutionStatus {
        private volatile boolean executed;

        public FutureExecutionStatus() {
            executed = false;
        }

        public void setExecuted() {
            executed = true;
        }

        public boolean isExecuted() {
            return executed;
        }
    }

    private class FutureDelegate<T> implements Future<T> {
        private Future<T> future;
        private FutureExecutionStatus executionStatus;

        public FutureDelegate(Future<T> future,
                FutureExecutionStatus executionStatus) {
            this.future = future;
            this.executionStatus = executionStatus;
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            boolean cancelled = future.cancel(mayInterruptIfRunning);
            if (cancelled) {
                // Lock read lock
                readLock.lock();
                // If task was not executed
                if (!executionStatus.isExecuted()) {
                    decrementPendingTaskCount();
                }
                // Unlock read lock
                readLock.unlock();
            }
            return cancelled;
        }

        @Override
        public T get() throws InterruptedException, ExecutionException {
            return future.get();
        }

        @Override
        public T get(long timeout, TimeUnit unit) throws InterruptedException,
                ExecutionException, TimeoutException {
            return future.get(timeout, unit);
        }

        @Override
        public boolean isCancelled() {
            return future.isCancelled();
        }

        @Override
        public boolean isDone() {
            return future.isDone();
        }
    }
}
于 2013-01-26T16:18:30.523 に答える
1

カウンターを使ってみませんか?例えば:

private AtomicInteger counter = new AtomicInteger(0);

タスクをキューに送信する直前に、カウンターを1つインクリメントします。

counter.incrementAndGet();

タスクの最後に1つずつデクリメントします。

counter.decrementAndGet();

チェックは次のようになります。

// ...
while (counter.get() > 0);
于 2013-01-26T10:13:20.303 に答える
0

待機するスレッドの数がわかっていて、CountDownLatch( http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ )を使用して、1行のコードを貼り付けて各スレッドの数を増やすことができる場合CountDownLatch.html)それはあなたの問題を解決することができます

于 2013-01-26T10:06:19.057 に答える
0

リンク先の回答で提案されているオプションの1つは、CompletionServiceを使用することです。

メインスレッドでのビジーウェイトを次のように置き換えることができます。

while (true) {
    Future<?> f = completionService.take(); //blocks until task completes
    if (executor.getQueue().isEmpty()
         && numTasks.longValue() == executor.getCompletedTaskCount()) break;
}

getCompletedTaskCountおおよその数のみを返すため、より適切な終了条件を見つける必要がある場合があることに注意してください。

于 2013-01-26T09:57:21.867 に答える
0

Java 7には、ForkJoinPoolエグゼキューターを介した再帰タスクのサポートが組み込まれています。タスク自体がそれほど些細なものでない限り、使用は非常に簡単で、拡張性も非常に優れています。基本的に、基になるスレッドを無期限にブロックすることなく、タスクがサブタスクの完了を待機できるようにする制御されたインターフェイスを提供します。

于 2013-01-26T10:03:05.853 に答える
0

最後のタスクはそれが最後であることを知らないので、タスクの起動時と完了時の両方を記録せずに、この作業を100%正しく行うことは実際には不可能だと思います。

メモリが適切に機能する場合、getQueue()メソッドは、現在実行中のタスクではなく、まだ実行を待機しているタスクのみを含むキューを返します。さらに、getCompletedTaskCount()おおよそです。

私が考えている解決策は、Eng.Fouadの回答のようなアトミックカウンターと、メインスレッドにウェイクアップするように信号を送るための条件を使用して、次のようになります(簡単にするためにショートカットを許してください)。

public class MyThreadPoolExecutorState {

    public final Lock lock = new ReentrantLock();
    public final Condition workDone = lock.newCondition();
    public boolean workIsDone = false;

}

public class MyThreadPoolExecutor extends ThreadPoolExecutor {

    private final MyThreadPoolExecutorState state;
    private final AtomicInteger counter = new AtomicInteger(0);

    public MyThreadPoolExecutor(MyThreadPoolExecutorState state, ...) {
        super(...);
        this.state = state;
    }

    protected void beforeExecute(Thread t, Runnable r) {
        this.counter.incrementAndGet();
    }

    protected void afterExecute(Runnable r, Throwable t) {
        if(this.counter.decrementAndGet() == 0) {
            this.state.lock.lock();
            try {
                this.state.workIsDone = true;
                this.state.workDone.signal();
            }
            finally {
                this.state.lock.unlock();
            }
        }
    }

}

public class MyApp {

    public static void main(...) {

        MyThreadPoolExecutorState state = new MyThreadPoolExecutorState();
        MyThreadPoolExecutor executor = new MyThreadPoolExecutor(state, ...);

        // Fire ze missiles!
        executor.submit(...);

        state.lock.lock();
        try {
            while(state.workIsDone == false) {
                state.workDone.await();
            }
        }
        finally {
            state.lock.unlock();
        }

    }

}

もう少しエレガントかもしれませんが(おそらくgetState()スレッドプールエグゼキュータなどに提供するだけですか?)、それで仕事が完了するはずだと思います。また、テストされていないので、自分の危険で実装してください...

実行するタスクがない場合、このソリューションは間違いなく失敗することに注意してください。シグナルを無期限に待機します。したがって、実行するタスクがない場合でも、エグゼキュータを起動する必要はありません。


編集:考え直してみると、アトミックカウンターのインクリメントは、タスクの実行直前ではなく、送信時に行う必要があります(キューイングにより、カウンターが時期尚早に0に低下する可能性があるため)。submit(...)代わりにメソッドをオーバーライドすることはおそらく理にかなっています、remove(...)そしておそらくまたshutdown()(もしあなたがそれらを使うなら)。ただし、一般的な考え方は同じです。(しかし、考えれば考えるほど、きれいではなくなります。)

また、クラスの内部をチェックして、クラスから知識を収集できるかどうかを確認します:http: //hg.openjdk.java.net/build-infra/jdk7/jdk/file/0f8da27a3ea3/src/share/ classes / java / util / concurrent/ThreadPoolExecutor.java。このtryTerminate()方法は面白そうです。

于 2013-01-26T11:23:16.817 に答える
0

アトミックカウンターを使用して送信をカウントできます(実際に送信する前に、前述のように)。これをセマフォと組み合わせて、が提供するafterExecuteフックで解放しThreadPoolExecutorます。ビジーウェイトの代わりにsemaphore.acquire( counter.get())、最初のラウンドのジョブが送信された後に電話をかけます。ただし、後でカウンターが増える可能性があるため、acquireを呼び出す場合はacquireの数が少なすぎます。カウンターがそれ以上増加しなくなるまで、引数として最後の呼び出しからの増加を使用して、取得呼び出しをループする必要があります。

于 2013-01-26T15:32:13.693 に答える