23

基本的に実行する必要があるタスクのリストをスキャンし、各タスクをエグゼキュータに渡して実行する次のコード スニペットがあります。

次にJobExecutor、別のエグゼキューターを作成し (db の処理を​​行うため...キューへのデータの読み取りと書き込みを行うため)、タスクを完了します。

JobExecutorFuture<Boolean>サブミットされたタスクに対してa を返します。タスクの 1 つが失敗した場合、すべてのスレッドを正常に中断し、すべての例外をキャッチしてエグゼキューターをシャットダウンしたいと考えています。どのような変更を行う必要がありますか?

public class DataMovingClass {
    private static final AtomicInteger uniqueId = new AtomicInteger(0);

  private static final ThreadLocal<Integer> uniqueNumber = new IDGenerator();   

  ThreadPoolExecutor threadPoolExecutor  = null ;

   private List<Source> sources = new ArrayList<Source>();

    private static class IDGenerator extends ThreadLocal<Integer> {
        @Override
        public Integer get() {
            return uniqueId.incrementAndGet();
        }
  }

  public void init(){

    // load sources list

  }

  public boolean execute() {

    boolean succcess = true ; 
    threadPoolExecutor = new ThreadPoolExecutor(10,10,
                10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024),
                new ThreadFactory() {
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        t.setName("DataMigration-" + uniqueNumber.get());
                        return t;
                    }// End method
                }, new ThreadPoolExecutor.CallerRunsPolicy());

     List<Future<Boolean>> result = new ArrayList<Future<Boolean>>();

     for (Source source : sources) {
                    result.add(threadPoolExecutor.submit(new JobExecutor(source)));
     }

     for (Future<Boolean> jobDone : result) {
                try {
                    if (!jobDone.get(100000, TimeUnit.SECONDS) && success) {
                        // in case of successful DbWriterClass, we don't need to change
                        // it.
                        success = false;
                    }
                } catch (Exception ex) {
                    // handle exceptions
                }
            }

  }

  public class JobExecutor implements Callable<Boolean>  {

        private ThreadPoolExecutor threadPoolExecutor ;
        Source jobSource ;
        public SourceJobExecutor(Source source) {
            this.jobSource = source;
            threadPoolExecutor = new ThreadPoolExecutor(10,10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024),
                    new ThreadFactory() {
                        public Thread newThread(Runnable r) {
                            Thread t = new Thread(r);
                            t.setName("Job Executor-" + uniqueNumber.get());
                            return t;
                        }// End method
                    }, new ThreadPoolExecutor.CallerRunsPolicy());
        }

        public Boolean call() throws Exception {
            boolean status = true ; 
            System.out.println("Starting Job = " + jobSource.getName());
            try {

                        // do the specified task ; 


            }catch (InterruptedException intrEx) {
                logger.warn("InterruptedException", intrEx);
                status = false ;
            } catch(Exception e) {
                logger.fatal("Exception occurred while executing task "+jobSource.getName(),e);
                status = false ;
            }
           System.out.println("Ending Job = " + jobSource.getName());
            return status ;
        }
    }
}   
4

3 に答える 3

19

タスクをエグゼキュータに送信すると、FutureTaskインスタンスが返されます。

FutureTask.get()は、タスクによってスローされた例外を として再スローしますExecutorException

そのため、 getを反復して呼び出して getを呼び出すと、通常のシャットダウンをList<Future>キャッチして呼び出すことができます。ExecutorException

于 2010-03-31T16:51:38.600 に答える
3

サブクラス化し、そのメソッドThreadPoolExecutorをオーバーライドします。protected afterExecute (Runnable r, Throwable t)

コンビニエンス クラスを介してスレッド プールを作成している場合java.util.concurrent.Executors(そうではない場合)、そのソースを見て、 ThreadPoolExecutorがどのように呼び出されているかを確認してください。

于 2013-04-14T09:20:49.797 に答える