に変換しようとしList<CompletableFuture<X>>
ていCompletableFuture<List<T>>
ます。これは、多くの非同期タスクがあり、それらすべての結果を取得する必要がある場合に非常に便利です。
それらのいずれかが失敗すると、最終的な未来が失敗します。これが私が実装した方法です:
public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
if(com.isEmpty()){
throw new IllegalArgumentException();
}
Stream<? extends CompletableFuture<T>> stream = com.stream();
CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
x.add(y);
return x;
},exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
ls1.addAll(ls2);
return ls1;
},exec));
}
実行するには:
ExecutorService executorService = Executors.newCachedThreadPool();
Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep((long) (Math.random() * 10));
} catch (InterruptedException e) {
e.printStackTrace();
}
return x;
}, executorService));
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);
それらのいずれかが失敗した場合、それは失敗します。百万の先物があっても、期待どおりの出力が得られます。私が抱えている問題は次のとおりです。5000を超える先物があり、それらのいずれかが失敗した場合、次のようになりますStackOverflowError
:
java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) でのスレッド "pool-1-thread-2611" java.lang.StackOverflowError での例外 java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java) :1487) java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193) で java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) で java.util.concurrent.CompletableFuture$ThenCompose.run( CompletableFuture.java:1487)
私は何を間違っていますか?
注: 上記の返された Future は、Future のいずれかが失敗したときに失敗します。受け入れられた回答もこの点を取る必要があります。