7

スレッドが完了しても、私はトリッキーな状況を抱えていfuture.isDone()ます。false

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class DataAccessor {
    private static ThreadPoolExecutor executor;
    private int timeout = 100000;
    static {
        executor = new ThreadPoolExecutor(10, 10, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000));
    }

    public static void main(String[] args) {
        List<String> requests = new ArrayList<String>();
        for(int i=0; i<20; i++){
            requests.add("request:"+i);
        }
        DataAccessor dataAccessor = new DataAccessor();

        List<ProcessedResponse> results = dataAccessor.getDataFromService(requests);
        for(ProcessedResponse response:results){
            System.out.println("response"+response.toString()+"\n");
        }
        executor.shutdown();
    }

    public List<ProcessedResponse> getDataFromService(List<String> requests) {
        final CountDownLatch latch = new CountDownLatch(requests.size());
        List<SubmittedJob> submittedJobs = new ArrayList<SubmittedJob>(requests.size());
        for (String request : requests) {
            Future<ProcessedResponse> future = executor.submit(new GetAndProcessResponse(request, latch));
            submittedJobs.add(new SubmittedJob(future, request));
        }
        try {
            if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
                // some of the jobs not done
                System.out.println("some jobs not done");
            }
        } catch (InterruptedException e1) {
            // take care, or cleanup
            for (SubmittedJob job : submittedJobs) {
                job.getFuture().cancel(true);
            }
        }
        List<ProcessedResponse> results = new LinkedList<DataAccessor.ProcessedResponse>();
        for (SubmittedJob job : submittedJobs) {
            try {
                // before doing a get you may check if it is done
                if (!job.getFuture().isDone()) {
                    // cancel job and continue with others
                    job.getFuture().cancel(true);
                    continue;
                }
                ProcessedResponse response = job.getFuture().get();
                results.add(response);
            } catch (ExecutionException cause) {
                // exceptions occurred during execution, in any
            } catch (InterruptedException e) {
                // take care
            }
        }
        return results;
    }

    private class SubmittedJob {
        final String request;
        final Future<ProcessedResponse> future;

        public Future<ProcessedResponse> getFuture() {
            return future;
        }

        public String getRequest() {
            return request;
        }

        SubmittedJob(final Future<ProcessedResponse> job, final String request) {
            this.future = job;
            this.request = request;
        }
    }

    private class ProcessedResponse {
        private final String request;
        private final String response;

        ProcessedResponse(final String request, final String response) {
            this.request = request;
            this.response = response;
        }

        public String getRequest() {
            return request;
        }

        public String getResponse() {
            return response;
        }

        public String toString(){
            return "[request:"+request+","+"response:"+ response+"]";
        }
    }

    private class GetAndProcessResponse implements Callable<ProcessedResponse> {
        private final String request;
        private final CountDownLatch countDownLatch;

        GetAndProcessResponse(final String request, final CountDownLatch countDownLatch) {
            this.request = request;
            this.countDownLatch = countDownLatch;
        }

        public ProcessedResponse call() {
            try {
                return getAndProcessResponse(this.request);
            } finally {
                countDownLatch.countDown();
            }
        }

        private ProcessedResponse getAndProcessResponse(final String request) {
            // do the service call
            // ........
            if("request:16".equals(request)){
                throw (new RuntimeException("runtime"));
            }
            return (new ProcessedResponse(request, "response.of." + request));
        }
    }
}

私が呼び出すfuture.isDone()と、trueが返されますfalseが返されcoundownLatch.await()ます。何か案が?また、これが発生するとすぐに countDownLatch.await が出てくることに注意してください。

ここで読み取り不可能な形式のビューを見つけた場合は、http://tinyurl.com/7j6cvep .

4

4 に答える 4

9

問題はおそらくタイミングの問題です。Future に関してすべてのタスクが実際に完了する前に、ラッチが解放されます (countDown()呼び出しがcall()メソッド内にあるため)。

基本的にCompletionServiceの作業を再作成しています(実装はExecutorCompletionServiceです)。代わりにそれを使用することをお勧めします。メソッドを使用しpoll(timeout)て結果を取得できます。合計時間を追跡し、各呼び出しのタイムアウトを合計残り時間まで減らすようにしてください。

于 2012-03-09T20:12:35.027 に答える
3

jtahlborn が述べたように、これはおそらく、CountdownLatch が待機中のスレッドにシグナルを送る競合状態であり、待機中のスレッドは、FutureTask が実行を終了する前に Future のキャンセル条件を評価します (これは の後のある時点で発生しますcountDown)。

CountdownLatch の同期メカニズムが Future の同期メカニズムと同期しているとは限りません。あなたがすべきことは、それがいつ完了したかを教えてくれる未来に頼ることです。

Future.get(long timeout, TimeUnit.MILLISECONDS)の代わりにできますCountdownLatch.await(long timeout, TimeUnit.MILLISECONDS)。ラッチと同じタイプの効果を得るには、すべての を に追加FutureList、リストを反復処理して、各 Future を取得します。

于 2012-03-10T11:01:36.237 に答える
2

競合状態のシナリオは次のとおりです。

  • メイン スレッドは にありlatch.await、ミリ秒間 Java スケジューラから CPU スロットを受け取りません
  • countDownLatch.countDown()内の最後のエグゼキュータ スレッド呼び出しfinally
  • Java スケジューラは、メイン スレッドがしばらく待機しているため、メイン スレッドをより優先することを決定します。
  • その結果、最後の結果を要求してもFuture、最後のエグゼキュータ スレッドは結果を伝播するためのタイム スライスを取得していないため、まだ利用できませんfinally

Javaスケジューラが実際にどのように機能するかについての詳細な説明は見つかりませんでした。おそらく、主にJVMを実行しているオペレーティングシステムに依存しているためですが、一般的に言えば、一定期間平均して実行可能なスレッドにCPUを均等に割り当てようとします。そのため、メイン スレッドは、他のスレッドが句isDoneを離れる前にテストに到達できます。finally

の後に結果の収集を変更することをお勧めしますlatch.await。ご存じのとおり、ラッチがゼロに減少したため (メイン スレッドが中断された場合を除く)、すべての結果がすぐに利用できるようになります。get メソッドに timeout を指定すると、スケジューラは、finally 句でまだ待機している最後のスレッドにタイム スライスを割り当てることができます。

    for (SubmittedJob job : submittedJobs) {
        try {
            ProcessedResponse response = null;
            try {
                // Try to get answer in short timeout, should be available
                response = job.getFuture().get(10, TimeUnit.MILLISECONDS);
            } catch (TimeoutException te) {
                job.getFuture().cancel(true);
                continue;
            }
            results.add(response);
        } catch (ExecutionException cause) {
            // exceptions occurred during execution, in any
        } catch (InterruptedException e) {
            // take care
        }
    }

備考:getAndProcessResponseメソッドが 1 ミリ秒未満で終了するため、コードは現実的ではありません。そこにランダムなスリープがあるため、競合状態はそれほど頻繁には発生しません。

于 2012-03-13T23:41:34.977 に答える
0

競合状態に関する意見には賛成です。ラッチを忘れて使用することをお勧めします java.util.concurrent.ThreadPoolExecutor.awaitTermination(long, TimeUnit)

于 2012-03-15T17:38:05.567 に答える