84

このブログ投稿で CompletionService を見つけました。ただし、これは、標準の ExecutorService に対する CompletionService の利点を実際に示しているわけではありません。どちらでも同じコードを書くことができます。では、いつ CompletionService が役立つのでしょうか?

明確にするために短いコードサンプルを提供できますか? たとえば、このコード サンプルは、CompletionService が不要な場所を示しているだけです (= ExecutorService と同等)。

    ExecutorService taskExecutor = Executors.newCachedThreadPool();
    //        CompletionService<Long> taskCompletionService =
    //                new ExecutorCompletionService<Long>(taskExecutor);
    Callable<Long> callable = new Callable<Long>() {
        @Override
        public Long call() throws Exception {
            return 1L;
        }
    };

    Future<Long> future = // taskCompletionService.submit(callable);
        taskExecutor.submit(callable);

    while (!future.isDone()) {
        // Do some work...
        System.out.println("Working on something...");
    }
    try {
        System.out.println(future.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
4

11 に答える 11

168

多くの詳細を省略:

  • ExecutorService = 着信キュー + ワーカー スレッド
  • CompletionService = 着信キュー + ワーカー スレッド + 出力キュー
于 2011-02-08T04:38:59.057 に答える
109

ではExecutorService、実行するタスクを送信したら、完了したタスクの結果を効率的に取得するために手動でコーディングする必要があります。

ではCompletionService、これはかなり自動化されています。提出したタスクは 1 つだけであるため、提示したコードでは違いはあまり明らかではありません。ただし、提出するタスクのリストがあるとします。以下の例では、複数のタスクが CompletionService に送信されます。次に、どのタスクが完了したか (結果を取得するため) を見つけようとする代わりに、CompletionService インスタンスに、結果が利用可能になったときに結果を返すように要求するだけです。

public class CompletionServiceTest {

        class CalcResult {
             long result ;

             CalcResult(long l) {
                 result = l;
             }
        }

        class CallableTask implements Callable<CalcResult> {
            String taskName ;
            long  input1 ;
            int input2 ;

            CallableTask(String name , long v1 , int v2 ) {
                taskName = name;
                input1 = v1;
                input2 = v2 ;
            }

            public CalcResult call() throws Exception {
                System.out.println(" Task " + taskName + " Started -----");
                for(int i=0;i<input2 ;i++) {
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        System.out.println(" Task " + taskName + " Interrupted !! ");
                        e.printStackTrace();
                    }
                    input1 += i;
                }
                System.out.println(" Task " + taskName + " Completed @@@@@@");
                return new CalcResult(input1) ;
            }

        }

        public void test(){
            ExecutorService taskExecutor = Executors.newFixedThreadPool(3);
            CompletionService<CalcResult> taskCompletionService = new ExecutorCompletionService<CalcResult>(taskExecutor);

            int submittedTasks = 5;
            for (int i=0;i< submittedTasks;i++) {
                taskCompletionService.submit(new CallableTask (
                        String.valueOf(i), 
                            (i * 10), 
                            ((i * 10) + 10  )
                        ));
               System.out.println("Task " + String.valueOf(i) + "subitted");
            }
            for (int tasksHandled=0;tasksHandled<submittedTasks;tasksHandled++) {
                try {
                    System.out.println("trying to take from Completion service");
                    Future<CalcResult> result = taskCompletionService.take();
                    System.out.println("result for a task availble in queue.Trying to get()");
                    // above call blocks till atleast one task is completed and results availble for it
                    // but we dont have to worry which one

                    // process the result here by doing result.get()
                    CalcResult l = result.get();
                    System.out.println("Task " + String.valueOf(tasksHandled) + "Completed - results obtained : " + String.valueOf(l.result));

                } catch (InterruptedException e) {
                    // Something went wrong with a task submitted
                    System.out.println("Error Interrupted exception");
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    // Something went wrong with the result
                    e.printStackTrace();
                    System.out.println("Error get() threw exception");
                }
            }
        }
    }
于 2011-04-07T11:05:12.293 に答える
13

基本的に、CompletionService複数のタスクを並行して実行し、それらを完了順に処理する場合は、を使用します。したがって、5 つのジョブを実行するCompletionServiceと、最初に終了したジョブが返されます。タスクが 1 つしかない例ではExecutorCallable.

于 2011-02-06T22:19:58.627 に答える
11

CompletionServicejavadoc は、が有用な場合とそうでない場合の質問に最もよく答えると思いますExecutorService

新しい非同期タスクの生成を、完了したタスクの結果の消費から切り離すサービス。

基本的に、このインターフェースにより、プログラムは、タスクの結果の他のコンシューマーを知らなくても、タスクを作成および送信する (さらには、それらの送信の結果を調べる) プロデューサーを持つことができます。一方、コンシューマーは、プロデューサーがタスクをサブミットしていることを意識せずに、CompletionService可能性pollまたは結果を認識しています。take

記録のために、かなり遅れているので間違っている可能性がありますが、そのブログ投稿のサンプル コードがメモリ リークを引き起こしていることはほぼ確実です。アクティブなコンシューマーがExecutorCompletionServiceの内部キューから結果を取得していないため、ブロガーがそのキューがどのように排出されると予想したかはわかりません。

于 2011-02-06T08:34:42.937 に答える
4

まず第一に、プロセッサの時間を無駄にしたくない場合は、

while (!future.isDone()) {
        // Do some work...
}

使用する必要があります

service.shutdown();
service.awaitTermination(14, TimeUnit.DAYS);

このコードの悪い点は、シャットダウンすることExecutorServiceです。作業を続けたい場合 (つまり、再帰的なタスクを作成する場合)、2 つの選択肢があります: invokeAll またはExecutorService.

invokeAllすべてのタスクが完了するまで待機します。ExecutorService結果を 1 つずつ取得またはポーリングする機能を提供します。

最後に、再帰的な例:

ExecutorService executorService = Executors.newFixedThreadPool(THREAD_NUMBER);
ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);

while (Tasks.size() > 0) {
    for (final Task task : Tasks) {
        completionService.submit(new Callable<String>() {   
            @Override
            public String call() throws Exception {
                return DoTask(task);
            }
        });
    } 

    try {                   
        int taskNum = Tasks.size();
        Tasks.clear();
        for (int i = 0; i < taskNum; ++i) {
            Result result = completionService.take().get();
            if (result != null)
                Tasks.add(result.toTask());
        }           
    } catch (InterruptedException e) {
    //  error :(
    } catch (ExecutionException e) {
    //  error :(
    }
}
于 2012-02-03T17:46:22.387 に答える
1

実行時に自分で確認し、両方のソリューション (Executorservice と Completionservice) を実装してみてください。それらの動作の違いがわかり、どちらをいつ使用するかがより明確になります。http://rdafbn.blogspot.co.uk/2013/01/executorservice-vs-completionservice-vs.htmlが必要な場合は、ここに例があります

于 2013-01-15T10:04:03.260 に答える
1

5 つの長時間実行タスク (呼び出し可能なタスク) があり、それらのタスクを実行サービスに送信したとします。ここで、5 つのタスクすべてが競合するのを待つのではなく、いずれかが完了した場合にこれらのタスクに対して何らかの処理を行いたいと想像してください。これは、将来のオブジェクトにポーリング ロジックを記述するか、この API を使用することで実行できます。

于 2015-07-31T11:42:32.613 に答える
1
package com.barcap.test.test00;

import java.util.concurrent.*;

/**
 * Created by Sony on 25-04-2019.
 */
public class ExecutorCompletest00 {

    public static void main(String[] args) {

        ExecutorService exc= Executors.newFixedThreadPool( 10 );
        ExecutorCompletionService executorCompletionService= new ExecutorCompletionService( exc );

        for (int i=1;i<10;i++){
            Task00 task00= new Task00( i );
            executorCompletionService.submit( task00 );
        }
        for (int i=1;i<20;i++){
            try {
                Future<Integer> future= (Future <Integer>) executorCompletionService.take();
                Integer inttest=future.get();
                System.out.println(" the result of completion service is "+inttest);

               break;
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
}

================================================== =====

package com.barcap.test.test00;

import java.util.*;
import java.util.concurrent.*;

/**
 * Created by Sony on 25-04-2019.
 */
public class ExecutorServ00 {

    public static void main(String[] args) {
        ExecutorService executorService=Executors.newFixedThreadPool( 9 );
        List<Future> futList= new ArrayList <>(  );
        for (int i=1;i<10;i++) {
           Future result= executorService.submit( new Task00( i ) );
           futList.add( result );
        }

         for (Future<Integer> futureEach :futList ){
             try {
              Integer inm=   futureEach.get();

                 System.out.println("the result of future executorservice is "+inm);
                 break;
             } catch (InterruptedException e) {
                 e.printStackTrace();
             } catch (ExecutionException e) {
                 e.printStackTrace();
             }
         }
    }
}

================================================== =========

package com.barcap.test.test00;

import java.util.concurrent.*;

/**
 * Created by Sony on 25-04-2019.
 */
public class Task00 implements Callable<Integer> {

    int i;

    public Task00(int i) {
        this.i = i;
    }

    @Override
    public Integer call() throws Exception {
        System.out.println(" the current thread is "+Thread.currentThread().getName()  +" the result should be "+i);
        int sleepforsec=100000/i;
         Thread.sleep( sleepforsec );
        System.out.println(" the task complted for "+Thread.currentThread().getName()  +" the result should be "+i);



        return i;
    }
}

================================================== ====================

Executor 完了サービスのログの違い:現在のスレッドは pool-1-thread-1 です 結果は 1 になります 現在のスレッドは pool-1-thread-2 です 結果は 2 になります 現在のスレッドは pool-1-thread-3 です 結果は 3 になります 現在のスレッドはスレッドは pool-1-thread-4 です 結果は 4 になります 現在のスレッドは pool-1-thread-6 です 結果は 6 になります 現在のスレッドは pool-1-thread-5 です 結果は 5 になります 現在のスレッドはpool-1-thread-7 結果は 7 になります。現在のスレッドは pool-1-thread-9 です。結果は 9 になります。現在のスレッドは pool-1-thread-8 です。結果は 8 になります。 1-thread-9 結果は 9 になるはずです 結果は 9 になります プール 1-スレッド 8 のタスクが完了しました 結果は 8 になります プール 1-スレッド 7 のタスクが完了しました 結果は 7 になります タスクが完了しましたpool-1-thread-6 結果は 6 タスクが完了したはずですpool-1-thread-5 結果は 5 である必要があります。 pool-1-thread-4 のタスクが完了しました。結果は 4 である必要があります。 pool-1-thread-3 のタスクが完了しました。結果は 3 である必要があります。

pool-1-thread-2 のタスクが完了した場合、結果は 2 になるはずです

現在のスレッドは pool-1-thread-1 です 結果は 1 になります 現在のスレッドは pool-1-thread-3 です 結果は 3 になります 現在のスレッドは pool-1-thread-2 です 結果は 2 になります 現在のスレッドはスレッドは pool-1-thread-5 です 結果は 5 になります 現在のスレッドは pool-1-thread-4 です 結果は 4 になります 現在のスレッドは pool-1-thread-6 です 結果は 6 になります 現在のスレッドはpool-1-thread-7 結果は 7 である必要があります。現在のスレッドは pool-1-thread-8 です。結果は 8 である必要があります。現在のスレッドは pool-1-thread-9 です。結果は 9 である必要があります。 1-thread-9 結果は 9 である必要があります。タスクはプール 1-スレッド 8 で完了しました。結果は 8 である必要があります。タスクはプール 1-スレッド 7 で完了しました。結果は 7 である必要があります。 thread-6 結果は 6 である必要があります タスクはプール 1 に対して完了しました-thread-5 結果5 である必要があります プール 1-スレッド 4 で完了したタスク 結果は 4 である必要があります プール 1-スレッド 3 で完了したタスク 結果は 3 である必要があります プール 1-スレッド 2 で完了したタスク 結果は次のとおりです2 pool-1-thread-1 のタスクが完了しました。結果は 1 である必要があります。将来の結果は 1 です。

================================================== =====

executorservice の場合、結果はすべてのタスクが完了した後にのみ利用可能になります。

エグゼキュータの完了サービスは、利用可能な結果を​​返します。

于 2019-04-25T16:09:25.597 に答える
0

完了サービスを使用する別の利点があります:パフォーマンス

を呼び出すとfuture.get()、スピン待ちになります。

からjava.util.concurrent.CompletableFuture

  private Object waitingGet(boolean interruptible) {
        Signaller q = null;
        boolean queued = false;
        int spins = -1;
        Object r;
        while ((r = result) == null) {
            if (spins < 0)
                spins = (Runtime.getRuntime().availableProcessors() > 1) ?
                    1 << 8 : 0; // Use brief spin-wait on multiprocessors
            else if (spins > 0) {
                if (ThreadLocalRandom.nextSecondarySeed() >= 0)
                    --spins;
            }

長時間実行されるタスクがある場合、これはパフォーマンスの惨事になります。

完了サービスを使用すると、タスクが完了すると、その結果がキューに入れられ、パフォーマンスの低いオーバーハンドでキューをポーリングできます。

done完了サービスは、フック付きのラップ タスクを使用してこれを実現します。

java.util.concurrent.ExecutorCompletionService

    private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
        super(task, null);
        this.task = task;
    }
    protected void done() { completionQueue.add(task); }
    private final Future<V> task;
}
于 2018-10-25T02:41:31.667 に答える
0

タスク プロデューサーが結果に関心がなく、executor サービスによって実行された非同期タスクの結果を処理するのが別のコンポーネントの責任である場合は、CompletionService を使用する必要があります。タスク結果プロセッサをタスク プロデューサーから分離するのに役立ちます。例を参照してください http://www.zoftino.com/java-concurrency-executors-framework-tutorial

于 2018-06-11T15:01:07.827 に答える