8

膨大な量のタスクがあります。各タスクは 1 つのグループに属します。要件は、タスクの各グループが単一スレッドで実行されるのと同じように連続して実行され、スループットがマルチコア (またはマルチ CPU) 環境で最大化される必要があることです。注: タスクの数に比例する膨大な量のグループもあります。

単純な解決策は、ThreadPoolExecutor と同期 (またはロック) を使用することです。ただし、スレッドは互いにブロックし合い、スループットは最大化されません。

もっと良いアイデアはありますか?または、要件を満たすサードパーティのライブラリが存在しますか?

4

5 に答える 5

3

簡単な方法は、すべてのグループ タスクを 1 つのスーパー タスクに「連結」して、サブタスクを連続して実行することです。ただし、これにより、他のグループが完全に終了し、スレッド プールにスペースが確保されない限り、開始されない他のグループで遅延が発生する可能性があります。

別の方法として、グループのタスクを連鎖させることを検討してください。次のコードはそれを示しています。

public class MultiSerialExecutor {
    private final ExecutorService executor;

    public MultiSerialExecutor(int maxNumThreads) {
        executor = Executors.newFixedThreadPool(maxNumThreads);
    }

    public void addTaskSequence(List<Runnable> tasks) {
        executor.execute(new TaskChain(tasks));
    }

    private void shutdown() {
        executor.shutdown();
    }

    private class TaskChain implements Runnable {
        private List<Runnable> seq;
        private int ind;

        public TaskChain(List<Runnable> seq) {
            this.seq = seq;
        }

        @Override
        public void run() {
            seq.get(ind++).run(); //NOTE: No special error handling
            if (ind < seq.size())
                executor.execute(this);
        }       
    }

利点は、余分なリソース (スレッド/キュー) が使用されていないこと、およびタスクの粒度が単純なアプローチの場合よりも優れていることです。不利な点は、すべてのグループのタスクを事前に知っておく必要があることです。

- 編集 -

このソリューションを汎用的かつ完全なものにするために、エラー処理 (つまり、エラーが発生してもチェーンを継続するかどうか) を決定する必要がある場合があります。また、ExecutorService を実装し、すべての呼び出しを基になるエグゼキューターに委任することをお勧めします。

于 2010-07-15T08:51:57.723 に答える
2

タスク キューを使用することをお勧めします。

  • タスクのグループごとにキューを作成し、そのグループのすべてのタスクをそこに挿入しました。
  • これで、1 つのキュー内のタスクを順次実行しながら、すべてのキューを並行して実行できるようになりました。

Google で簡単に検索すると、Java API 自体にはタスク/スレッド キューがないことがわかります。ただし、コーディングに関するチュートリアルは多数あります。あなたがいくつか知っているなら、誰もが良いチュートリアル/実装を自由にリストしてください:

于 2010-07-15T08:28:04.123 に答える
1

私は Dave の答えにほぼ同意しますが、CPU 時間をすべての「グループ」にわたってスライスする必要がある場合、つまりすべてのタスク グループが並行して進行する必要がある場合は、この種の構造が役立つ場合があります (削除を「ロック」として使用します。私の場合は、より多くのメモリを使用する傾向があると思いますが):

class TaskAllocator {
    private final ConcurrentLinkedQueue<Queue<Runnable>> entireWork
         = childQueuePerTaskGroup();

    public Queue<Runnable> lockTaskGroup(){
        return entireWork.poll();
    }

    public void release(Queue<Runnable> taskGroup){
        entireWork.offer(taskGroup);
    }
 }

 class DoWork implmements Runnable {
     private final TaskAllocator allocator;

     public DoWork(TaskAllocator allocator){
         this.allocator = allocator;
     }

     pubic void run(){
        for(;;){
            Queue<Runnable> taskGroup = allocator.lockTaskGroup();
            if(task==null){
                //No more work
                return;
            }
            Runnable work = taskGroup.poll();
            if(work == null){
                //This group is done
                continue;
            }

            //Do work, but never forget to release the group to 
            // the allocator.
            try {
                work.run();
            } finally {
                allocator.release(taskGroup);
            }
        }//for
     }
 }

その後、最適な数のスレッドを使用してDoWorkタスクを実行できます。ラウンド ロビンの負荷分散のようなものです。

単純なキューの代わりにこれを使用することで、より洗練された何かを行うこともできますTaskAllocator(より多くのタスクが残っているタスク グループが実行される傾向があります)。

ConcurrentSkipListSet<MyQueue<Runnable>> sophisticatedQueue = 
    new ConcurrentSkipListSet(new SophisticatedComparator());

どこSophisticatedComparatorですか

class SophisticatedComparator implements Comparator<MyQueue<Runnable>> {
    public int compare(MyQueue<Runnable> o1, MyQueue<Runnable> o2){
        int diff = o2.size() - o1.size();
        if(diff==0){
             //This is crucial. You must assign unique ids to your 
             //Subqueue and break the equality if they happen to have same size.
             //Otherwise your queues will disappear...
             return o1.id - o2.id;
        }
        return diff;
    }
 }
于 2010-07-15T08:48:14.543 に答える
0

アクターも、この特定のタイプの問題に対する別のソリューションです。Scala にはアクターがあり、AKKA が提供する Java もあります。

于 2010-10-14T08:32:16.757 に答える
-2

私はあなたと同様の問題を抱えていました。私はExecutorCompletionServiceで動作するを使用してExecutor、タスクのコレクションを完了しました。以下は、Java7 以降の java.util.concurrent API からの抜粋です。

特定の問題に対する一連のソルバーがあり、それぞれがある種の Result の値を返し、それらを同時に実行して、null 以外の値を返すそれぞれの結果を何らかのメソッド use(Result) で処理するとします。 r)。これは次のように記述できます。

void solve(Executor e, Collection<Callable<Result>> solvers)
        throws InterruptedException, ExecutionException {
    CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
    for (Callable<Result> s : solvers)
        ecs.submit(s);
    int n = solvers.size();
    for (int i = 0; i < n; ++i) {
        Result r = ecs.take().get();
        if (r != null)
            use(r);
    }
}

したがって、シナリオでは、すべてのタスクは単一Callable<Result>の になり、タスクは にグループ化されますCollection<Callable<Result>>

参照: http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html

于 2014-10-28T16:07:01.407 に答える