7

タスクのリストを実行するために固定スレッドプールでjava.util.concurrent.ExecutorServiceを使用しています。私のタスク リストは通常​​ 80 ~ 150 程度で、以下に示すように、一度に実行するスレッドの数を 10 に制限しています。

ExecutorService threadPoolService = Executors.newFixedThreadPool(10);

for ( Runnable task : myTasks ) 
{     
    threadPoolService.submit(task); 
}

私のユースケースでは、完了したタスクでもExecutorServiceに再送信する必要がありますが、既に送信されたすべてのタスクが処理/完了された場合にのみ実行/実行する必要があります。つまり、基本的に、提出されたタスクはローテーションベースで実行する必要があります。threadPoolService.shutdown()したがって、この場合、いずれかまたはthreadPoolService.shutdownNow()呼び出しはありません。

私の質問は、回転ベースのタスクにサービスを提供するExecutorServiceをどのように実装するのですか?

4

4 に答える 4

12

ThreadPoolExecutor は、ジョブをキューの最後に戻すことができる afterExecution の拡張ポイントを提供します。

public class TaskRepeatingThreadPoolExecutor extends ThreadPoolExecutor {

    public TaskRepeatingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        this.submit(r);
    }
}

もちろん、 の便利なファクトリ メソッドの助けを借りずに自分でインスタンス化するには、もう少し作業を行う必要がありますExecutorServiceが、コンストラクターは理解するのに十分単純です。

于 2012-04-27T07:07:54.113 に答える
1

標準ライブラリの同時実行ユーティリティに存在する機能を完全に使用する次のソリューションをお勧めします。これはCyclicBarrier、タスクデコレータクラスと、すべてのタスクを再送信するバリアアクションを使用します。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Rotation {

    private static final class RotationDecorator implements Runnable {
        private final Runnable          task;
        private final CyclicBarrier barrier;


        RotationDecorator( Runnable task, CyclicBarrier barrier ) {
            this.task = task;
            this.barrier = barrier;
        }


        @Override
        public void run() {
            this.task.run();
            try {
                this.barrier.await();
            } catch(InterruptedException e) {
                ; // Consider better exception handling
            } catch(BrokenBarrierException e) {
                ; // Consider better exception handling
            }
        }
    }


    public void startRotation( List<Runnable> tasks ) {
        final ExecutorService threadPoolService = Executors.newFixedThreadPool( 10 );
        final List<Runnable> rotatingTasks = new ArrayList<Runnable>( tasks.size() );
        final CyclicBarrier barrier = new CyclicBarrier( tasks.size(), new Runnable() {
            @Override
            public void run() {
                Rotation.this.enqueueTasks( threadPoolService, rotatingTasks );
            }
        } );
        for(Runnable task : tasks) {
            rotatingTasks.add( new RotationDecorator( task, barrier ) );
        }
        this.enqueueTasks( threadPoolService, rotatingTasks );
    }


    private void enqueueTasks( ExecutorService service, List<Runnable> tasks ) {
        for(Runnable task : tasks) {
            service.submit( task );
        }
    }

}
于 2012-04-27T07:45:29.917 に答える
1

答えは、 のインスタンスに使用されるワーク キューの実装に関連していますExecutorService。だから、私はお勧めします:

  1. まず、循環キュー機能を提供するjava.util.concurrent.BlockingQueue(例) の実装を選択します。、選択された理由は、次のタスクがキューに提供されるまで待機するためです。そのため、循環 + ブロッキングキューの場合、同じ動作と機能を提供する方法に注意する必要があります。BlockingQueue

  2. Executors.new...を使用して新しいものを作成する代わりに、次のような直接コンストラクターThreadPoolExecutorを使用します

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)

このように、エグゼキューターに を命令しない限り、タスクの循環コンテナーであるワーク キューshutdownから実行するためにキューから次のタスクを取得しようとします。

于 2012-04-27T07:14:08.637 に答える
1

たとえば、次のように、すべてのタスクが実行されたことを確認し、実行されたら再送信できます。

    List<Future> futures = new ArrayList<>();
    for (Runnable task : myTasks) {
        futures.add(threadPoolService.submit(task));
    }
    //wait until completion of all tasks
    for (Future f : futures) {
        f.get();
    }
    //restart
    ......

EDIT
タスクが完了したらすぐに再送信したいようです。ExecutorCompletionServiceを使用すると、タスクが実行されたときにタスクを取得できます。完了後すぐに数回再送信される 2 つのタスクの簡単な例を以下に示します。出力例:

タスク 1 は pool-1-thread-1 を
提出しました タスク 2 は pool-1-thread-2 を提出しましたタスク 1 は pool-1-thread
-1 を完了しました タスク
1 は pool-1-thread-3 を提出しました タスク 1 が完了した pool-1-thread-3 タスク 2 が提出された pool-1-thread-4 タスク 1 が提出された pool-1-thread-5 タスク 1 が完了した pool-1-thread-5 タスク 2 が完了した pool-1-thread-4





public class Test1 {

    public final ConcurrentMap<String, String> concurrentMap = new ConcurrentHashMap<>();
    public final AtomicInteger retries = new AtomicInteger();
    public final Object lock = new Object();

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        int count = 0;
        List<Runnable> myTasks = new ArrayList<>();
        myTasks.add(getRunnable(1));
        myTasks.add(getRunnable(2));
        ExecutorService threadPoolService = Executors.newFixedThreadPool(10);
        CompletionService<Runnable> ecs = new ExecutorCompletionService<Runnable>(threadPoolService);
        for (Runnable task : myTasks) {
            ecs.submit(task, task);
        }
        //wait until completion of all tasks
        while(count++ < 3) {
            Runnable task = ecs.take().get();
            ecs.submit(task, task);
        }
        threadPoolService.shutdown();
    }

    private static Runnable getRunnable(final int i) {
        return new Runnable() {

            @Override
            public void run() {
                System.out.println("Task " + i + " submitted " + Thread.currentThread().getName() + "  ");
                try {
                    Thread.sleep(500 * i);
                } catch (InterruptedException ex) {
                    System.out.println("Interrupted");
                }
                System.out.println("Task " + i + " completed " + Thread.currentThread().getName() + "  ");
            }
        };
    }
}
于 2012-04-27T11:22:23.583 に答える