19

リクエストを別のスレッドに送信するコードがいくつかあります。別のスレッドは、そのリクエストをさらに別のスレッドに送信する場合と送信しない場合があります。の戻り値の型が得られFuture<Future<T>>ます。Future<T>これをすぐに将来のチェーン全体の完了を待つように変える非凶悪な方法はありますか?

私はすでに Guava ライブラリを使用して、他の楽しい並行処理を処理し、Google Collections の代わりとしてうまく機能していますが、この場合の何かを見つけることができないようです。

4

5 に答える 5

7

guava ライブラリを使用する別の可能な実装は、はるかに単純です。

import java.util.concurrent.*;
import com.google.common.util.concurrent.*;
import com.google.common.base.*;

public class FFutures {
  public <T> Future<T> flatten(Future<Future<T>> future) {
    return Futures.chain(Futures.makeListenable(future), new Function<Future<T>, ListenableFuture<T>>() {
      public ListenableFuture<T> apply(Future<T> f) {
        return Futures.makeListenable(f);
      }
    });
  }
}
于 2010-02-09T14:55:07.117 に答える
5

Guava 13.0 はFutures.dereferenceこれを行うために追加します。ListenableFuture<ListenableFuture>プレーンではなく が必要Future<Future>です。(プレーンでの操作にFutureは makeListenable 呼び出しが必要であり、それぞれがタスクの有効期間にわたって専用スレッドを必要とします (メソッドの新しい名前によって明確になっていますJdkFutureAdapters.listenInPoolThread)。)

于 2012-08-07T19:44:53.710 に答える
1

これがフューチャーの契約を履行するためにできる最善のことだと思います。私はそれが契約を確実に満たすように、可能な限り賢くないということに気を配りました。特に、getwithtimeoutの実装ではありません。

import java.util.concurrent.*;

public class Futures {
  public <T> Future<T> flatten(Future<Future<T>> future) {
    return new FlattenedFuture<T>(future);
  }

  private static class FlattenedFuture<T> implements Future<T> {
    private final Future<Future<T>> future;

    public FlattenedFuture(Future<Future<T>> future) {
      this.future = future;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
      if (!future.isDone()) {
        return future.cancel(mayInterruptIfRunning);
      } else {
        while (true) {
          try {
            return future.get().cancel(mayInterruptIfRunning);
          } catch (CancellationException ce) {
            return true;
          } catch (ExecutionException ee) {
            return false;
          } catch (InterruptedException ie) {
            // pass
          }
        }
      }
    }

    public T get() throws InterruptedException, 
                          CancellationException, 
                          ExecutionException 
    {
      return future.get().get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, 
                                                     CancellationException, 
                                                     ExecutionException, 
                                                     TimeoutException 
    {
      if (future.isDone()) {
        return future.get().get(timeout, unit);
      } else {
        return future.get(timeout, unit).get(0, TimeUnit.SECONDS);
      }
    }

    public boolean isCancelled() {
      while (true) {
        try {
          return future.isCancelled() || future.get().isCancelled();
        } catch (CancellationException ce) {
          return true;
        } catch (ExecutionException ee) {
          return false;
        } catch (InterruptedException ie) {
          // pass
        }
      }
    }

    public boolean isDone() {
      return future.isDone() && innerIsDone();
    }

    private boolean innerIsDone() {
      while (true) {
        try {
          return future.get().isDone();
        } catch (CancellationException ce) {
          return true;
        } catch (ExecutionException ee) {
          return true;
        } catch (InterruptedException ie) {
          // pass
        }
      }
    }
  }
}
于 2010-02-09T00:19:29.687 に答える
0

これは私の最初の刺し傷でしたが、間違いがたくさんあると確信しています。のようなものに置き換えていただければ幸いFutures.compress(f)です。

public class CompressedFuture<T> implements Future<T> {
    private final Future<Future<T>> delegate;

    public CompressedFuture(Future<Future<T>> delegate) {
        this.delegate = delegate;
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        if (delegate.isDone()) {
            return delegate.cancel(mayInterruptIfRunning);
        }
        try {
            return delegate.get().cancel(mayInterruptIfRunning);
        } catch (InterruptedException e) {
            throw new RuntimeException("Error fetching a finished future", e);
        } catch (ExecutionException e) {
            throw new RuntimeException("Error fetching a finished future", e);
        }
    }

    @Override
    public T get() throws InterruptedException, ExecutionException {
        return delegate.get().get();
    }

    @Override
    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        long endTime = System.currentTimeMillis() + unit.toMillis(timeout);
        Future<T> next = delegate.get(timeout, unit);
        return next.get(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public boolean isCancelled() {
        if (!delegate.isDone()) {
            return delegate.isCancelled();
        }
        try {
            return delegate.get().isCancelled();
        } catch (InterruptedException e) {
            throw new RuntimeException("Error fetching a finished future", e);
        } catch (ExecutionException e) {
            throw new RuntimeException("Error fetching a finished future", e);
        }
    }

    @Override
    public boolean isDone() {
        if (!delegate.isDone()) {
            return false;
        }
        try {
            return delegate.get().isDone();
        } catch (InterruptedException e) {
            throw new RuntimeException("Error fetching a finished future", e);
        } catch (ExecutionException e) {
            throw new RuntimeException("Error fetching a finished future", e);
        }
    }
}
于 2010-01-29T21:31:31.493 に答える
0

次のようなクラスを作成できます。

public class UnwrapFuture<T> implements Future<T> {
    Future<Future<T>> wrappedFuture;

    public UnwrapFuture(Future<Future<T>> wrappedFuture) {
        this.wrappedFuture = wrappedFuture;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        try {
            return wrappedFuture.get().cancel(mayInterruptIfRunning);
        } catch (InterruptedException e) {
            //todo: do something
        } catch (ExecutionException e) {
            //todo: do something
        }
    }
    ...
}

get() は発生できるが、他のメソッドは発生できない例外に対処する必要があります。

于 2010-01-29T21:19:45.437 に答える