2

私は CompletionService クラスを調査していますが、送信キューを完了キューから切り離すことが非常に役立つことがわかりました。

しかし、キャンセルされたタスクもポーリング/取得する方法がありません(ある意味で完了したと見なすことができます)。なんとなく簡単にできるかな?

Future<String> task1Future =  completionService.submit(myCallableTask1);
Future<String> task2Future =  completionService.submit(myCallableTask2);
task2Future.cancel();
Future<String> lastComplTaskFuture = completionService.take(); 
                        //Seems to return only the completed or 
                        //interrupted tasks, not those cancelled (task2)

編集:いくつかの回答を確認した後、何が起こっているのかわかりました。CompletionService は、送信されたジョブと同じ順序で返されます。ジョブ a、b、c を実行した場合。a が動作している間は b と c をキャンセルします。そして最後に completitionService をポーリングすると、タスクが終了するまで b と c のキャンセルは通知されません。また、個々のタスクをキャンセルする代わりにエグゼキューターをシャットダウンすると、アクティブなタスクがまだ終了していない間にキャンセルが完了しても、キューに残っているタスクは完了サービスに到達しないことに気付きました。

EDIT2:わかりました、テストケース全体を追加しました

import java.util.concurrent.*;

public class CompletionTest {

    public static void main(String[] args){
        CompletionService<String> completion = 
                new ExecutorCompletionService<String>(Executors.newSingleThreadExecutor()); 
        Future<String> aFuture =
                 completion.submit(() -> {Thread.sleep(10000); return "A";});
        completion.submit(() -> {return "B";}).cancel(true);
        completion.submit(() -> {return "C";}).cancel(true);
        completion.submit(() -> {return "D";}).cancel(true);

        long arbitraryTime = System.currentTimeMillis();
        while (true){
            String result = getNextResult(completion);
            System.out.println(result);

            if (System.currentTimeMillis() > arbitraryTime+5000){
                aFuture.cancel(true);
            }
         }
    }

    public static String getNextResult(CompletionService<String> service){
        try {
            String taskId = service.take().get();
            return "'"+taskId+"' job completed successfully.";
        } 
        catch (InterruptedException e ) { return "Unknown job was interrupted/stopped while waiting (no thread.wait() or infinite loop in the job so I guess this should not be possible)."; } 
        catch (CancellationException e) { return "Unknown job was cancelled."; }  
        catch (ExecutionException e) { 
            return "Unknown job returned with the following internal exception while running: " + e.getCause().getMessage();
        }
    }
}

次のような出力が期待されました。

Unknown job was cancelled.
Unknown job was cancelled.
Unknown job was cancelled.
'A' job completed successfully.

しかし、代わりに次のようになりました。

'A' job completed successfully.
Unknown job was cancelled.
Unknown job was cancelled.
Unknown job was cancelled.

PriorityBlockingQueue を CompletionService のキューとして使用しようとしましたComparator.<Future<String>,Boolean>comparing(Future::isCancelled).reversed()が、どちらも機能しませんでした (ただし、要素の状態が変化した場合は解決しないと思います)。

4

1 に答える 1