1

MyCallable("B")次のサンプルコードがあり、他のコードが1秒より速く実行される場合、実行に1秒以上かかるとしましょう。したがって、を呼び出すループ内で、Future.get()をスローしTimeoutExceptionます。

public static void main(String[] args) {
    ExecutorService es = Executors.newFixedThreadPool(2);

    List<Future<String>> futures = new ArrayList<Future<String>>();

    futures.add(es.submit(new MyCallable("A")));
    futures.add(es.submit(new MyCallable("B")));
    futures.add(es.submit(new MyCallable("C")));
    futures.add(es.submit(new MyCallable("D")));
    futures.add(es.submit(new MyCallable("E")));

    try {
        for(Future<String> f  : futures) {
            try {
                System.out.println("result " + f.get(1, TimeUnit.SECONDS));
            }
            catch (TimeoutException e) {
                // how do I know which MyCallable() has timed out?
            } catch (ExecutionException e) {
                System.out.println(e.getMessage());
            }
        }
    }
    catch (InterruptedException e) {
        e.printStackTrace();
    }
    finally {
        es.shutdown();
    }
}

予想どおり、各MyCallable()インスタンスは実行されますが、タイムアウトしたインスタンスについては、エラー処理を実行したいと思います。これには、どれがどのCallableに関連付けられているかを知る必要がありFutureます。

この関連付けのメカニズムはありますか、それともそのメソッドCallable内のすべてのエラー処理を処理するのは私次第ですか?call()

4

2 に答える 2

1

Map<Future<String>, Callable<String>>a の代わりに aを維持しList<Future<String>>、元の Callable をそのように取得できるようです。

本当に賢くなりたい場合は、OO スタイルで ThreadPoolExecutor を拡張し、Future デコレータ クラスを作成できます。これはおそらくやり過ぎだと思いますが、次のようにすることができます。

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;


public class FutureWithCallable<T> implements Future<T> {
    private final Callable<T> callable;
    private final Future<T> wrapped;

    public FutureWithCallable(Future<T> wrapped, Callable<T> callable) {
        this.callable = callable;
        this.wrapped = wrapped;
    }

    public Callable<T> getCallable() {
        return callable;
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return wrapped.cancel(mayInterruptIfRunning);
    }

    @Override
    public T get() throws InterruptedException, ExecutionException {
        return wrapped.get();
    }

    @Override
    public T get(long timeout, TimeUnit unit) throws InterruptedException,
            ExecutionException, TimeoutException {
        return wrapped.get(timeout, unit);
    }

    @Override
    public boolean isCancelled() {
        return wrapped.isCancelled();
    }

    @Override
    public boolean isDone() {
        return wrapped.isDone();
    }
}

その後:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

    public class ExecutorServiceWithCallable extends ThreadPoolExecutor {

        public ExecutorServiceWithCallable(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue);
        }

        @Override
        public <T> FutureWithCallable submit(Callable<T> callable) {
            Future<T> future = super.submit(callable);
            return new FutureWithCallable<T>(future, callable);
        }

    }
于 2012-08-27T22:40:37.013 に答える