13

私のアプリケーションの 1 つで、ExecutorServiceクラスを使用して固定スレッド プールを作成CountDownLatchし、スレッドが完了するのを待ちます。プロセスが例外をスローしなかった場合、これは正常に機能しています。いずれかのスレッドで例外が発生した場合は、実行中のすべてのスレッドを停止し、メイン スレッドにエラーを報告する必要があります。誰でもこれを解決するのを手伝ってもらえますか?

これは、複数のスレッドを実行するために使用しているサンプル コードです。

    private void executeThreads()
    {
        int noOfThreads = 10;
        ExecutorService executor = Executors.newFixedThreadPool(noOfThreads);     
        try      
       {
        CountDownLatch latch = new CountDownLatch(noOfThreads);
        for(int i=0; i< noOfThreads; i++){
         executor.submit(new ThreadExecutor(latch));
        }
        latch.await();           
       }
       catch(Exception e)
       {
        e.printStackTrace();
       }
       finally
       {
        executor.shutDown();
       }
   }

これは実行者クラスです

     public class ThreadExecutor implements Callable<String> {
        CountDownLatch latch ;
        public ThreadExecutor(CountDownLatch latch){
            this.latch = latch;
        }   

    @Override
    public String call() throws Exception
    {
        doMyTask(); // process logic goes here!
        this.latch.countDown();
        return "Success";
    }

================================================== ===========================

皆さん、ありがとうございました :)

以下のようにクラスを修正しましたが、現在は機能しています。

private void executeThreads()
    {
        int noOfThreads = 10;
        ExecutorService executor = Executors.newFixedThreadPool(noOfThreads);     
       ArrayList<Future<Object>> futureList = new ArrayList<Future<Object>>(noOfThreads );
    try
    {
        userContext = BSF.getMyContext();
        CountDownLatch latch = new CountDownLatch(noOfComponentsToImport);

        for(ImportContent artifact:artifactList){
            futureList.add(executor.submit(new ThreadExecutor(latch)));
        }

        latch.await();

        for(Future<Object> future : futureList)
        {
                  try
                  {
                      future.get();                 
                   }
                   catch(ExecutionException e)
                   {   //handle it               
                    }
        }           

    }
    catch (Exception e) {
       //handle it
    }
    finally
    {
        executor.shutdown();      

        try
        {
            executor.awaitTermination(90000, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e)
        {
           //handle it
        }
    }
   }

エグゼキュータ クラス:

public class ThreadExecutor implements Callable<String> {
        private static volatile boolean isAnyError;
        CountDownLatch latch ;
        public ThreadExecutor(CountDownLatch latch){
            this.latch = latch;
        }   

    @Override
    public String call() throws Exception
    {

      try{
            if(!isAnyError)
            { 
               doMyTask(); // process logic goes here!
            }
     }
     catch(Exception e)
     {
        isAnyError = true ;
        throw e;
      }
      finally
      {
        this.latch.countDown();
       }
        return "Success";
    }
4

4 に答える 4

5

を使用してExecutorCompletionServiceExecutorServiceタスクの期間を超えて存続する (つまり、その後シャットダウンされない) を使用します。

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

class Threader {

    static ExecutorService service = Executors.newCachedThreadPool();

    public static void main(String[] args) {
        new Threader().start();
        service.shutdown();
    }

    private void start() {
        CompletionService<Void> completionService = new ExecutorCompletionService<Void>(
                service);
        /*
         * Holds all the futures for the submitted tasks
         */
        List<Future<Void>> results = new ArrayList<Future<Void>>();

        for (int i = 0; i < 3; i++) {
            final int callableNumber = i;

            results.add(completionService.submit(new Callable<Void>() {

                                                     @Override
                                                     public Void call() throws Exception {
                                                         System.out.println("Task " + callableNumber
                                                                 + " in progress");
                                                         try {
                                                             Thread.sleep(callableNumber * 1000);
                                                         } catch (InterruptedException ex) {
                                                             System.out.println("Task " + callableNumber
                                                                     + " cancelled");
                                                             return null;
                                                         }
                                                         if (callableNumber == 1) {
                                                             throw new Exception("Wrong answer for task "
                                                                     + callableNumber);
                                                         }
                                                         System.out.println("Task " + callableNumber + " complete");
                                                         return null;
                                                     }
                                                 }

            ));
        }

        boolean complete = false;
        while (!complete) {
            complete = true;
            Iterator<Future<Void>> futuresIt = results.iterator();
            while (futuresIt.hasNext()) {
                if (futuresIt.next().isDone()) {
                    futuresIt.remove();
                } else {
                    complete = false;
                }
            }

            if (!results.isEmpty()) {
                try {
                /*
                 * Examine results of next completed task
                 */
                    completionService.take().get();
                } catch (InterruptedException e) {
                /*
                 * Give up - interrupted.
                 */
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                } catch (ExecutionException e) {
                /*
                 * The task threw an exception
                 */
                    System.out.println("Execution exception " + e.getMessage());
                    complete = true;
                    for (Future<Void> future : results) {
                        if (!future.isDone()) {
                            System.out.println("Cancelling " + future);
                            future.cancel(true);
                        }
                    }
                }
            }
        }

    }
}

出力は次のようなものです。

Task 0 in progress
Task 2 in progress
Task 1 in progress
Task 0 complete
Execution exception java.lang.Exception: Wrong answer for task 1
Cancelling java.util.concurrent.FutureTask@a59698
Task 2 cancelled

ここで、タスク 1 の失敗によりタスク 2 がキャンセルされました。

于 2012-08-16T09:38:08.997 に答える
4

堅牢なメカニズムを使用してラッチをカウントダウンすることを強くお勧めします。別のメカニズムを使用して、包括的なtry-finally { latch.countDown(); }スレッド内のエラーを検出します。

于 2012-08-16T09:04:34.707 に答える
1

AtomicBooleanforの値をチェックする「ウォッチャー」と呼ばれるスレッドがもう 1 つ必要になると思いますtrue。設定したら、メインの実行サービスをシャットダウンします。シャットダウン メカニズムは、すべてのスレッドの即時停止を保証しないことに注意してください。たとえば、これを読んでください:スレッドとエグゼキュータのグレースフルシャットダウン

于 2012-08-16T09:10:55.017 に答える
1

コードを再構築する必要があると思います。ExecutorService#invokeAnyを見てみましょう

指定されたタスクを実行し、正常に完了した (つまり、例外をスローせずに) 完了したタスクがあれば、その結果を返します。通常または例外的な復帰時に、完了していないタスクはキャンセルされます。この操作の進行中に指定されたコレクションが変更された場合、このメソッドの結果は未定義です。

これは必要な動作のようです。CountDownLatchまた、 as mainwill ブロッ​​クは必要ありませんinvokeAny

于 2012-08-16T09:15:18.773 に答える