31

私は Java 8 の完了可能な先物で遊んでいます。次のコードがあります。

CountDownLatch waitLatch = new CountDownLatch(1);

CompletableFuture<?> future = CompletableFuture.runAsync(() -> {
    try {
        System.out.println("Wait");
        waitLatch.await(); //cancel should interrupt
        System.out.println("Done");
    } catch (InterruptedException e) {
        System.out.println("Interrupted");
        throw new RuntimeException(e);
    }
});

sleep(10); //give it some time to start (ugly, but works)
future.cancel(true);
System.out.println("Cancel called");

assertTrue(future.isCancelled());

assertTrue(future.isDone());
sleep(100); //give it some time to finish

runAsync を使用して、ラッチを待機するコードの実行をスケジュールします。次に、中断された例外が内部でスローされることを期待して、未来をキャンセルします。しかし、スレッドは await 呼び出しでブロックされたままのようであり、将来がキャンセルされても (アサーションが渡されます)、InterruptedException がスローされることはありません。ExecutorService を使用した同等のコードは、期待どおりに機能します。CompletableFuture または私の例のバグですか?

4

6 に答える 6

26

を呼び出すとCompletableFuture#cancel、チェーンの下流部分のみが停止します。上流部分、つまり最終的にcomplete(...)orcompleteExceptionally(...)を呼び出すものは、結果が不要になったというシグナルを受け取りません。

それらの「上流」と「下流」とは何ですか?

次のコードを考えてみましょう。

CompletableFuture
        .supplyAsync(() -> "hello")               //1
        .thenApply(s -> s + " world!")            //2
        .thenAccept(s -> System.out.println(s));  //3

ここでは、データは上から下に流れます。つまり、サプライヤによって作成され、機能によって変更され、によって消費されprintlnます。特定のステップより上の部分は上流と呼ばれ、下の部分は下流と呼ばれます。例)ステップ 1 と 2 は、ステップ 3 のアップストリームです。

舞台裏で何が起こっているかを次に示します。これは正確ではなく、何が起こっているかを示す便利なマインド モデルです。

  1. サプライヤー (ステップ 1) が実行されています (JVM の common 内ForkJoinPool)。
  2. complete(...)その後、サプライヤの結果が次のCompletableFutureダウンストリームに渡されます。
  3. 結果を受け取ると、それは次のステップを呼び出します-前のステップの結果を受け取り、さらに下流のCompletableFutureに渡される何かを返す関数 (ステップ 2) 。CompletableFuturecomplete(...)
  4. ステップ 2 の結果を受け取ると、ステップ 3CompletableFutureでコンシューマが呼び出されますSystem.out.println(s)。コンシューマーが終了すると、ダウンストリームはそのCompletableFuture値を受け取ります。(Void) null

ご覧のとおり、このチェーンのそれぞれは、値が自分の(または)CompletableFutureに渡されるのを待っているダウンストリームが誰であるかを知る必要があります。しかし、アップストリーム (またはアップストリーム - いくつかあるかもしれません) について何も知る必要はありません。complete(...)completeExceptionally(...)CompletableFuture

したがって、ステップ 3を呼び出しcancel()ても、ステップ 1 と 2 は中止されません。これは、ステップ 3 からステップ 2 へのリンクがないためです。

を使用している場合CompletableFuture、ステップは十分に小さいため、いくつかの余分なステップが実行されても害はありません。

キャンセルを上流に伝播する場合は、次の 2 つのオプションがあります。

  • これを自分で実装します-すべてのステップの後にチェックされる専用のCompletableFuture(のように名前を付けます)を作成します(のようなもの)cancelledstep.applyToEither(cancelled, Function.identity())
  • RxJava 2、ProjectReactor/Flux、Akka Streams などのリアクティブ スタックを使用する
于 2017-06-06T18:28:54.283 に答える
0

CancellationException は、内部の ForkJoin キャンセル ルーチンの一部です。future の結果を取得すると、例外が発生します。

try { future.get(); }
      catch (Exception e){
          System.out.println(e.toString());            
      }

デバッガーでこれを確認するのにしばらく時間がかかりました。JavaDoc は、何が起こっているのか、何を期待すべきなのかについて明確ではありません。

于 2014-04-27T22:25:57.503 に答える