34

を使用しExecutorServiceてタスクを実行します。このタスクは、同じタスクにサブミットされる他のタスクを再帰的に作成できExecutorService、それらの子タスクもそれを実行できます。

すべてのタスクが完了する (つまり、すべてのタスクが完了し、新しいタスクが送信されない) まで待ちたいという問題があります。

ExecutorService.shutdown()新しいタスクが によって受け入れられなくなるため、メイン スレッドで呼び出すことができませんExecutorService

Callingは、呼び出されてExecutorService.awaitTermination()いない場合は何もしないようです。shutdown

だから私はここで立ち往生しています。ExecutorServiceすべてのワーカーがアイドル状態であることを確認するのはそれほど難しいことではありませんね。私が思いつくことができる唯一の洗練されていない解決策は、 a を直接使用してThreadPoolExecutor、時々クエリを実行することgetPoolSize()です。それを行うより良い方法は本当にありませんか?

4

11 に答える 11

19

これは本当にフェイザーの理想的な候補です。Java7はこの新しいクラスで登場します。その柔軟なCountdonwLatch/CyclicBarrier。安定版はJSR166インタレストサイトで入手できます。

より柔軟なCountdownLatch/CyclicBarrierは、未知の数のパーティ(スレッド)をサポートできるだけでなく、再利用可能(フェーズ部分が入る場所)もサポートできるためです。

提出するタスクごとに登録し、そのタスクが完了すると到着します。これは再帰的に実行できます。

Phaser phaser = new Phaser();
ExecutorService e = //

Runnable recursiveRunnable = new Runnable(){
   public void run(){
      //do work recursively if you have to

      if(shouldBeRecursive){
           phaser.register();
           e.submit(recursiveRunnable);
      }

      phaser.arrive();
   }
}

public void doWork(){
   int phase = phaser.getPhase();

   phaser.register();
   e.submit(recursiveRunnable);

   phaser.awaitAdvance(phase);
}

編集:前の例で競合状態を指摘してくれた@depthofrealityに感謝します。再帰関数の完了をブロックするため、実行中のスレッドが現在のフェーズの進行のみを待機するように更新しています。

フェーズ番号は、arrives== registersの数になるまでトリップしません。各再帰呼び出しが呼び出される前にregister、すべての呼び出しが完了するとフェーズ増分が発生します。

于 2011-02-10T14:43:07.890 に答える
17

再帰タスクのツリー内のタスクの数が最初に不明な場合、おそらく最も簡単な方法は、独自の同期プリミティブ、ある種の「逆セマフォ」を実装し、それをタスク間で共有することです。各タスクを送信する前に値を増やし、タスクが完了するとその値を減らし、値が 0 になるまで待ちます。

タスクから明示的に呼び出される別のプリミティブとして実装すると、このロジックがスレッド プールの実装から分離され、再帰タスクの複数の独立したツリーを同じプールに送信できるようになります。

このようなもの:

public class InverseSemaphore {
    private int value = 0;
    private Object lock = new Object();

    public void beforeSubmit() {
        synchronized(lock) {
            value++;
        }
    }

    public void taskCompleted() {
        synchronized(lock) {
            value--;
            if (value == 0) lock.notifyAll();
        }
    }

    public void awaitCompletion() throws InterruptedException {
        synchronized(lock) {
            while (value > 0) lock.wait();
        }
    }
}

可能性のある例外の影響を受けないようにするためにtaskCompleted()、ブロック内で呼び出す必要があることに注意してください。finally

またbeforeSubmit()、古いタスクが完了し、新しいタスクがまだ開始されていない場合に発生する可能性のある「誤った完了」を回避するために、タスク自体ではなく、タスクが送信される前に送信スレッドによって呼び出される必要があることに注意してください。

編集:使用パターンに関する重要な問題が修正されました。

于 2011-02-10T14:37:34.807 に答える
7

うわー、あなたたちは速いです:)

すべての提案に感謝します。事前にスケジュールされているランナブルの数がわからないため、先物はモデルと簡単に統合できません。したがって、再帰的な子タスクが終了するのを待つためだけに親タスクを存続させると、たくさんのゴミが横たわっています。

AtomicInteger の提案を使用して問題を解決しました。基本的に、ThreadPoolExecutor をサブクラス化し、execute() の呼び出しでカウンターをインクリメントし、afterExecute() の呼び出しでカウンターをデクリメントします。カウンターが 0 になったら、shutdown() を呼び出します。これは私の問題ではうまくいくようですが、それが一般的に良い方法であるかどうかはわかりません。特に、execute() のみを使用して Runnables を追加すると仮定します。

サイド ノードとして: 最初に afterExecute() で、キュー内の Runnables の数と、アクティブでシャットダウンされているワーカーの数 (それらが 0 の場合) をチェックしようとしました。しかし、すべての Runnables がキューに表示されたわけではなく、 getActiveCount() も期待どおりに機能しなかったため、これは機能しませんでした。

とにかく、これが私の解決策です:(誰かがこれに深刻な問題を見つけた場合は、私に知らせてください:)

public class MyThreadPoolExecutor extends ThreadPoolExecutor {

    private final AtomicInteger executing = new AtomicInteger(0);

    public MyThreadPoolExecutor(int coorPoolSize, int maxPoolSize, long keepAliveTime,
        TimeUnit seconds, BlockingQueue<Runnable> queue) {
        super(coorPoolSize, maxPoolSize, keepAliveTime, seconds, queue);
    }


    @Override
    public void execute(Runnable command) {
        //intercepting beforeExecute is too late!
        //execute() is called in the parent thread before it terminates
        executing.incrementAndGet();
        super.execute(command);
    }


    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        int count = executing.decrementAndGet();
        if(count == 0) {
            this.shutdown();
        }
    }

}
于 2011-02-10T15:16:00.707 に答える
3

ThreadPoolExecutorを拡張する独自のスレッドプールを作成できます。タスクがいつ送信され、いつ完了するかを知りたい。

public class MyThreadPoolExecutor extends ThreadPoolExecutor {
    private int counter = 0;

    public MyThreadPoolExecutor() {
        super(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    }

    @Override
    public synchronized void execute(Runnable command) {
        counter++;
        super.execute(command);
    }

    @Override
    protected synchronized void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        counter--;
        notifyAll();
    }

    public synchronized void waitForExecuted() throws InterruptedException {
        while (counter == 0)
            wait();
    }
}
于 2011-05-01T10:27:07.190 に答える
1

タスクに ( を送信する代わりに) Futureを使用するRunnableと、完了時にコールバックが状態を更新するため、Future.isDoneを使用してすべてのタスクの状態を追跡できます。

于 2011-02-10T14:35:08.927 に答える
0

私が思いつくことができる唯一の洗練されていない解決策は、ThreadPoolExecutor を直接使用し、その getPoolSize() を時々クエリすることです。それを行うより良い方法は本当にありませんか?

shutdown() ,適切な順序でawaitTermination()and shutdownNow()メソッドを使用する必要があります。

shutdown(): 以前にサブミットされたタスクが実行される通常のシャットダウンを開始しますが、新しいタスクは受け入れられません。

awaitTermination(): シャットダウン要求の後、すべてのタスクが実行を完了するか、タイムアウトが発生するか、または現在のスレッドが中断されるかのいずれかが最初に発生するまでブロックします。

shutdownNow(): アクティブに実行中のすべてのタスクの停止を試み、待機中のタスクの処理を停止し、実行を待機していたタスクのリストを返します。

ExecutorServiceのOracleドキュメントページからの推奨される方法:

 void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }

以下のように、タスクの完了に長時間かかる場合は、 if 条件を while 条件に置き換えることができます。

変化する

if (!pool.awaitTermination(60, TimeUnit.SECONDS))

 while(!pool.awaitTermination(60, TimeUnit.SECONDS)) {
     Thread.sleep(60000);
 }  

join()で他の代替手段を参照できます (スタンドアロン スレッドで使用できるを除く)。

すべてのスレッドが Java での作業を完了するまで待ちます

于 2016-09-02T15:05:32.477 に答える
0

JSR166y クラス (Phaser や Fork/Join など) を使用したい場合は、どちらでも動作する可能性があります。それらの Java 6 バックポートをhttp://gee.cs.oswego.edu/dl/concurrencyからいつでもダウンロードできます。 -interest/完全に自作のソリューションを書くのではなく、それを基礎として使用します。その後、7 が出てきたら、バックポートへの依存関係を削除して、いくつかのパッケージ名を変更するだけです。

(完全な開示: しばらくの間、本番環境で LinkedTransferQueue を使用してきました。問題はありません)

于 2011-05-03T09:39:33.197 に答える
0

(私の過ち: 就寝時刻を「少し」過ぎています ;) しかし、これは動的ラッチの最初の試みです):

package oss.alphazero.sto4958330;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class DynamicCountDownLatch {
    @SuppressWarnings("serial")
    private static final class Sync extends AbstractQueuedSynchronizer {
        private final CountDownLatch toplatch;
        public Sync() {
            setState(0);
            this.toplatch = new CountDownLatch(1);
        }

        @Override
        protected int tryAcquireShared(int acquires){
            try {
                toplatch.await();
            } 
            catch (InterruptedException e) {
                throw new RuntimeException("Interrupted", e);
            }
            return getState() == 0 ? 1 : -1;
        }
        public boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc)) 
                    return nextc == 0;
            }
        }
        public boolean tryExtendState(int acquires) {
            for (;;) {
                int s = getState();
                int exts = s+1;
                if (compareAndSetState(s, exts)) {
                    toplatch.countDown();
                    return exts > 0;
                }
            }
        }
    }
    private final Sync sync;
    public DynamicCountDownLatch(){
        this.sync = new Sync();
    }
    public void await() 
        throws InterruptedException   
    {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit   unit) 
        throws InterruptedException   
    {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    public void countDown() {
        sync.releaseShared(1);
    }
    public void join() {
        sync.tryExtendState(1);
    }
}

このラッチは、新しいメソッド join() を既存の (複製された) CountDownLatch API に導入します。これは、タスクがより大きなタスク グループへのエントリを通知するために使用されます。

ラッチは、親タスクから子タスクに渡されます。各タスクは、Suraj のパターンに従って、最初にラッチを「join()」し、その task() を実行し、次に countDown() を実行します。

メイン スレッドがタスク グループを起動し、すぐに awaits() (タスク スレッドが join() を実行する前に) を実行する状況に対処するために、topLatchint 内部Syncクラスが使用されます。これは、join(); ごとにカウントダウンされるラッチです。もちろん、重要なのは最初のカウントダウンだけです。後続のカウントダウンはすべて nop です。

上記の初期実装では、tryAcquiredShared(int) が InterruptedException をスローすることは想定されていないため、一種のセマンティック シワが生じますが、topLatch の待機時に割り込みを処理する必要があります。

これは、アトミックカウンターを使用したOP独自のソリューションの改善ですか? 彼がエグゼキューターの使用に固執している IFF ではないと思いますが、その場合、AQS を使用した同等に有効な代替アプローチであり、汎用スレッドでも使用できると思います。

仲間のハッカーを批判します。

于 2011-05-01T07:19:18.863 に答える
0

CountDownLatchを使用します。CountDownLatch オブジェクトを各タスクに渡し、タスクを次のようにコーディングします。

public void doTask() {
    // do your task
    latch.countDown(); 
}

待機する必要があるスレッドは、次のコードを実行する必要があります。

public void doWait() {
    latch.await();
}

ただし、もちろん、これは子タスクの数を既に知っていることを前提としているため、ラッチのカウントを初期化できます。

于 2011-02-10T14:33:53.690 に答える
0

実行中のスレッドを追跡するランナーを使用できます。

Runner runner = Runner.runner(numberOfThreads);

runner.runIn(2, SECONDS, callable);
runner.run(callable);


// blocks until all tasks are finished (or failed)
runner.waitTillDone();


// and reuse it
runner.runRunnableIn(500, MILLISECONDS, runnable);


runner.waitTillDone();


// and then just kill it
runner.shutdownAndAwaitTermination();

それを使用するには、依存関係を追加するだけです:

「com.github.matejtimes:javafixes:1.3.0」をコンパイルします

于 2019-06-20T23:27:00.787 に答える