83

に変換しようとしList<CompletableFuture<X>>ていCompletableFuture<List<T>>ます。これは、多くの非同期タスクがあり、それらすべての結果を取得する必要がある場合に非常に便利です。

それらのいずれかが失敗すると、最終的な未来が失敗します。これが私が実装した方法です:

public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
    if(com.isEmpty()){
        throw new IllegalArgumentException();
    }
    Stream<? extends CompletableFuture<T>> stream = com.stream();
    CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
    return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
        x.add(y);
        return x;
    },exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
        ls1.addAll(ls2);
        return ls1;
    },exec));
}

実行するには:

ExecutorService executorService = Executors.newCachedThreadPool();
Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep((long) (Math.random() * 10));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return x;
}, executorService));
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);

それらのいずれかが失敗した場合、それは失敗します。百万の先物があっても、期待どおりの出力が得られます。私が抱えている問題は次のとおりです。5000を超える先物があり、それらのいずれかが失敗した場合、次のようになりますStackOverflowError:

java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) でのスレッド "pool-1-thread-2611" java.lang.StackOverflowError での例外 java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java) :1487) java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193) で java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) で java.util.concurrent.CompletableFuture$ThenCompose.run( CompletableFuture.java:1487)

私は何を間違っていますか?

注: 上記の返された Future は、Future のいずれかが失敗したときに失敗します。受け入れられた回答もこの点を取る必要があります。

4

9 に答える 9

100

使用CompletableFuture.allOf(...):

static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
    return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
            .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
            );
}

実装に関するいくつかのコメント:

.thenComposeAsync.thenApplyAsyncおよびの使用は、.thenCombineAsync期待どおりに機能しない可能性があります。これらの...Asyncメソッドは、提供された関数を別のスレッドで実行します。したがって、あなたの場合、リストへの新しい項目の追加を、提供されたエグゼキューターで実行しています。軽量の操作をキャッシュされたスレッド エグゼキューターに詰め込む必要はありません。thenXXXXAsync正当な理由なしにメソッドを 使用しないでください。

さらに、reduce変更可能なコンテナーに蓄積するために使用しないでください。ストリームがシーケンシャルの場合は正しく動作するかもしれませんが、ストリームが並列になると失敗します。変更可能なリダクションを実行するには、.collect代わりに使用します。

最初の失敗の直後に計算全体を例外的に完了したい場合は、sequenceメソッドで次の操作を行います。

CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
        .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
        );

com.forEach(f -> f.whenComplete((t, ex) -> {
    if (ex != null) {
        result.completeExceptionally(ex);
    }
}));

return result;

さらに、最初の失敗で残りの操作をキャンセルしたい場合は、 のexec.shutdownNow();直後に追加しresult.completeExceptionally(ex);ます。もちろん、これは がexecこの 1 回の計算のためにのみ存在することを前提としています。そうでない場合は、ループして残りをFuture個別にキャンセルする必要があります。

于 2015-05-04T09:26:28.363 に答える
11

Misha が指摘したように、操作を使いすぎて…Asyncいます。さらに、プログラム ロジックを反映しない依存関係をモデル化する複雑な一連の操作を構成しています。

  • リストの最初と 2 番目のジョブに依存するジョブ x を作成します
  • ジョブ x とリストの 3 番目のジョブに依存するジョブ x+1 を作成します
  • ジョブ x+1 とリストの 4 番目のジョブに依存するジョブ x+2 を作成します。
  • …</li>
  • ジョブ x+4999 とリストの最後のジョブに依存するジョブ x+5000 を作成します

次に、(明示的または例外により) キャンセルすると、この再帰的に構成されたジョブが再帰的に実行され、StackOverflowError. それは実装依存です。

Misha が既に示したようにallOf元の意図をモデル化して、リストのすべてのジョブに依存する 1 つのジョブを定義できる方法があります。

ただし、それも必要ではないことに注意してください。無制限のスレッド プール エグゼキュータを使用しているため、結果を収集する非同期ジョブをリストにポストするだけで完了です。いずれにせよ、各ジョブの結果を要求することで、完了を待つことが暗示されます。

ExecutorService executorService = Executors.newCachedThreadPool();
List<CompletableFuture<Integer>> que = IntStream.range(0, 100000)
  .mapToObj(x -> CompletableFuture.supplyAsync(() -> {
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos((long)(Math.random()*10)));
    return x;
}, executorService)).collect(Collectors.toList());
CompletableFuture<List<Integer>> sequence = CompletableFuture.supplyAsync(
    () -> que.stream().map(CompletableFuture::join).collect(Collectors.toList()),
    executorService);

スレッドの数が制限されており、ジョブが追加の非同期ジョブを生成する可能性がある場合は、依存する操作を構成するためのメソッドを使用することが重要です。これは、待機中のジョブが最初に完了する必要があるジョブからスレッドを盗むことを避けるためですが、ここではそうではありません。

この特定のケースでは、1 つのジョブがこの多数の前提条件ジョブを単純に反復し、必要に応じて待機する方が、この多数の依存関係をモデル化し、各ジョブが依存ジョブに完了を通知するよりも効率的です。

于 2015-05-04T11:08:42.927 に答える
10

Spotify のCompletableFuturesライブラリを取得してallAsListメソッドを使用できます。Futures.allAsListグアバの方法からインスピレーションを得ていると思います。

public static <T> CompletableFuture<List<T>> allAsList(
    List<? extends CompletionStage<? extends T>> stages) {

ライブラリを使用したくない場合の簡単な実装を次に示します。

public <T> CompletableFuture<List<T>> allAsList(final List<CompletableFuture<T>> futures) {
    return CompletableFuture.allOf(
        futures.toArray(new CompletableFuture[futures.size()])
    ).thenApply(ignored ->
        futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
    );
}
于 2016-05-28T08:47:38.153 に答える