6

私は不自然なコード例を書き上げました。誰かが使うべきコードではないかもしれませんが、うまくいくはずだと私は信じています。ただし、代わりにデッドロックします。hereで説明されている回答を読みましたが、不十分であることがわかりました。

コード例は次のとおりです。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class Test {

    public static void main(String argv[]) throws Exception {
        
        int nThreads = 1;
        Executor executor = Executors.newFixedThreadPool( nThreads );
        

        
        CompletableFuture.completedFuture(true)
            .thenComposeAsync((unused)->{
            
                System.err.println("About to enqueue task");
                CompletableFuture<Boolean> innerFuture = new CompletableFuture<>();
                executor.execute(() -> {

                    // pretend this is some really expensive computation done asynchronously

                    System.err.println("Inner task");
                    innerFuture.complete(true);
                });
                System.err.println("Task enqueued");
                             
                return innerFuture;
            }, executor).get();

        System.err.println("All done");
        System.exit(0);

    }
    
}

これは以下を出力します:

タスクをエンキューしようとしています

キューに入れられたタスク

そして、ハングします。Executor にはスレッドが 1 つしかなく、innerFuture が償還可能になるのを待っているため、デッドロック状態です。「thenComposeAsync」は、まだ不完全な未来を返し、エグゼキュータでそのスレッドを解放するのではなく、戻り値が償還可能になるのをブロックするのはなぜですか?

これは完全に直観的ではなく、javadocs はあまり役に立ちません。CompletionStages の仕組みを根本的に誤解していますか? それとも、これは実装のバグですか?

4

2 に答える 2

3

そこで、多くの興味深い会話の後、JDK の作成者の 1 人に電子メールを送ることにしました。この動作は意図したものではなく、実際には 1.8u25 に存在するバグであることがわかりました。Java 8 の以降のパッチ バージョンでリリースされる修正プログラムがあります。どれかはわかりません。新しい動作をテストしたい人は、最新の jsr166 jar をここからダウンロードできます。

http://gee.cs.oswego.edu/dl/concurrency-interest/index.html

于 2014-12-21T04:55:05.740 に答える
3

まず、2 つの静的関数を使用してコードを書き直して、何が起こっているかを簡単に確認できるようにします。

// Make an executor equivalent to Executors.newFixedThreadPool(nThreads)
// that will trace to standard error when a task begins or ends
static ExecutorService loggingExecutor(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>()) {

                @Override
                protected void beforeExecute(Thread t, Runnable r) {
                    System.err.println("Executor beginning task on thread: " 
                       + t.getName());
                }

                @Override
                protected void afterExecute(Runnable r, Throwable t) {
                    System.err.println("Executor finishing task on thread: "
                       + Thread.currentThread().getName());
                }

            };
}

// same as what you pass to thenComposeAsync
static Function<Boolean, CompletableFuture<Boolean>> inner(Executor executor) {
    return b -> {
        System.err.println(Thread.currentThread().getName() 
                   + ": About to enqueue task");
        CompletableFuture<Boolean> innerFuture = new CompletableFuture<>();
        executor.execute(() -> {
            System.err.println(Thread.currentThread().getName() 
                   + ": Inner task");
            innerFuture.complete(true);
        });
        System.err.println(Thread.currentThread().getName() 
                   + ": Task enqueued");

        return innerFuture;
    };
}

これで、テスト ケースを次のように記述できます。

ExecutorService e = loggingExecutor(1);

CompletableFuture.completedFuture(true)
        .thenComposeAsync(inner(e), e)
        .join();

e.shutdown();

/* Output before deadlock:
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
*/

2 番目の Future の結果が計算されるまで、最初のスレッドは解放されないという結論をテストしてみましょう。

ExecutorService e = loggingExecutor(2);  // use 2 threads this time

CompletableFuture.completedFuture(true)
        .thenComposeAsync(inner(e), e)
        .join();

e.shutdown();

/*
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
Executor beginning task on thread: pool-1-thread-2
pool-1-thread-2: Inner task
Executor finishing task on thread: pool-1-thread-2
Executor finishing task on thread: pool-1-thread-1
*/

実際、スレッド 1 はスレッド 2 が完了するまで保持されているようです。

thenComposeAsyncそれ自体がブロックすることが正しいかどうかを見てみましょう。

ExecutorService e = loggingExecutor(1);

CompletableFuture<Boolean> future = 
        CompletableFuture.completedFuture(true)
        .thenComposeAsync(inner(e), e);

System.err.println("thenComposeAsync returned");

future.join();

e.shutdown();

/*
thenComposeAsync returned
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
*/

thenComposeAsyncブロックしませんでした。CompletableFutureすぐに返され、完了しようとしたときにのみデッドロックが発生しました。では、 によって返される未来を完成させるには何が必要.thenComposeAsync(inner(e), e)でしょうか?

  1. API は、inner(e) が戻るのを待つ必要がありますCompletableFuture<Boolean>
  2. CompletableFuture<Boolean>返されたものも完了するのを待つ必要があります。そうして初めて、未来は完成します。したがって、ご覧のとおり、提案したことを実行して不完全な Future を返すことはできません。

バグですか?内部タスクの計算中に CompletionStage がスレッド 1 を保持するのはなぜですか? ご指摘のとおり、ドキュメントはかなり曖昧で、特定の順序でスレッドをリリースすることを約束していないため、これはバグではありません。また、CompletableFuture の後続のthen*()メソッドには Thread1 が使用されることに注意してください。次の点を考慮してください。

ExecutorService e = loggingExecutor(2);

CompletableFuture.completedFuture(true)
        .thenComposeAsync(inner(e), e)
        .thenRun(() -> System.err.println(Thread.currentThread().getName()
                         + ": All done"))
        .join();

e.shutdown();

/*
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
Executor beginning task on thread: pool-1-thread-2
pool-1-thread-2: Inner task
Executor finishing task on thread: pool-1-thread-2
pool-1-thread-1: All done
Executor finishing task on thread: pool-1-thread-1
*/

ご覧のとおり、スレッド 1 で .thenRun(...) が実行されました。これは、CompletableFuture の他の *Async(... , Executor exec) メソッドと一致していると思います。

thenComposeAsyncしかし、スレッドのジャグリングを API に任せるのではなく、 の機能を 2 つの個別に制御可能なステップに分割したい場合はどうでしょうか? これを行うことができます:

ExecutorService e = loggingExecutor(1);

completedFuture(true)
        .thenApplyAsync(inner(e), e) // do the async part first
        .thenCompose(x -> x)         // compose separately
        .thenRun(() -> System.err.println(Thread.currentThread().getName()
                        + ": All done"))
        .join();

e.shutdown();

すべてがデッドロックなしで 1 つのスレッドでうまく実行されます。

結論として、あなたが言うように、この動作は直感的ではありませんか? 知らない。なぜthenComposeAsync存在するのか想像できません。メソッドが を返す場合CompletableFuture、それはブロックされるべきではなく、非同期で呼び出す理由があってはなりません。

于 2014-12-19T06:53:52.437 に答える