0

終了時にエグゼキューターのステータスを読み取り、失敗時に残りのタスクをキャンセルするために、別のクラス内からこのスレッドを作成しました。タスクは実行可能です

何らかの障害が見られる場合、全体的なステータスは 1 または失敗でなければなりません

final CompletionService completionService = new ExecutorCompletionService(getExecutorService());
final List<Future> futures = new ArrayList<Future>();

    FutureTask<Integer> tasks = new FutureTask<Integer>(new Callable<Integer>() {

        public Integer call() {

            int status = 0;
            boolean fail = false;

            try {
                for (int i = 0; i < 10; i++) {

                    MyRunnable resultObj = null;

                    try {
                        resultObj = (MyRunnable) completionService.take().get();
                    } catch (CancellationException e) {
                        // Skip it ..
                    }

                    if (!fail) {
                        status = resultObj.getStatus();

                        if (status == 1) {
                            fail = true;
                            for (Future future : futures) {
                                if (!future.isCancelled() && !future.isDone())
                                    future.cancel(true); // cancel pending tasks including running tasks 
                            }
                        }
                    }
                }
            } catch (Exception e) {
                 e.printStackTrace();
            }

            return status;
        }

            });

上記のスレッドが開始されました -

ExecutorService pool = Executors.newSingleThreadExecutor();
pool.submit(tasks);

その下で、オブジェクトはプールから借用され、それはブロッキング コールであり、プール サイズを 3 に設定しました。そのため、最初は 3 つの MyRunnable ワーカーがすぐに作成されます。各ワーカーが終了すると、残りのタスクを処理するために再利用されます。

for (int i = 0 ; i < 10; i ++;) {

    MyRunnable myRunnable = null;
    myRunnable = (MyRunnable) this.getGenericObjectPool().borrowObject();

    set myRunnable ..

    futures.add(completionService.submit(myRunnable, myRunnable));

}

while (!tasks.isDone()) {

        try {
            Thread.sleep(Global.WaitTime());            
        } catch (InterruptedException iex) {            
        }

}

finalStatus = tasks.get();
pool.shutdown();

GenericObjectPool は、オブジェクトを再利用するために構成されています。最初のスレッドを強制的に失敗させ、そのステータスを 1 に設定することにより、IDE でテストをシミュレートしました。 GenricObjectPool によって行われる新しいオブジェクトのアクティブ化の一部として 0 に戻ります。

そのため、失敗したスレッドからステータスを読み取ることができません。MyRunnable は Callable ではないため、 completionService.submit(obj,obj) を使用して Runnable をだます必要がありました

プール サイズを 10 以上にすると、この問題は発生しません。これは、オブジェクトが再利用されず、それぞれのステータスを正常に読み取れるためですが、それはオプションではありません。

4

1 に答える 1

0

これを修正するために Runnable の CallableDecorator を作成しました。GenericObjectPool を使用しても適切な戻り値が得られるようになりました。ステータスを読み取るための Pool オブジェクトへの依存関係がなくなったため、オブジェクトを再利用してもステータスがリセットされることはありません -

したがって、コードの 2 つの変更 - 変更

futures.add(completionService.submit(myRunnable, myRunnable));

futures.add(completionService.submit(new CallableDecorator(myRunnable)));

新しいクラスを追加する

public class CallableDecorator implements Callable {

       IRunnable r;

       public CallableDecorator(IRunnable r) {

           this.r = r;
       }

       public Integer call() {

           r.run();
           return r.statusCode();
       }
}

interface IRunnable extends Runnable {
     public Integer statusCode();
}

同様に、監視スレッドで値を取得するには、resultObj を整数に変更する必要があります。

于 2013-02-28T14:45:56.870 に答える