0

複数のタスクを送信して、利用可能なときに結果を取得しようとしています。ただし、ループの終了後、すべてのタスクが指定された時間内に完了するように強制する必要があります。そうでない場合は、エラーをスローします。最初は、executorService の invokeAll、shutdown、およびawaitTerminationの呼び出しだけで、すべてのタスクが (エラーの有無にかかわらず) 確実に完了するようにしました。CompletionServiceを使用して結果を表示するようにコードを移行しました。CompletionService 呼び出しで awaitTermination 句をどこに適用できますか?

 CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);
            logger.info("Submitting all tasks");
            for (Callable<String> task : tasks)
                completionService.submit(task);
            executor.shutdown();
            logger.info("Tasks submitted. Now checking the status.");
            while (!executor.isTerminated())
            {
                final Future<String> future = completionService.take();
                String itemValue;
                try
                {
                    itemValue = future.get();
                    if (!itemValue.equals("Bulk"))
                        logger.info("Backup completed for " + itemValue);
                }
                catch (InterruptedException | ExecutionException e)
                {
                    String message = e.getCause().getMessage();
                    String objName = "Bulk";
                    if (message.contains("(") && message.contains(")"))
                        objName = message.substring(message.indexOf("(") + 1, message.indexOf(")"));
                    logger.error("Failed retrieving the task status for " + objName, e);
                }
            }
executor.awaitTermination(24, TimeUnit.HOURS);

つまり、CompletionService のタイムアウトをどのように利用できますか?

編集:

私が持っていた最初のコードは以下に表示されました。問題は、将来のリストを反復処理してから、完了したものとして出力していることです。ただし、私の要件は、FCFS ベースで完了したものを表示することです。

List<Future<String>> results = executor.invokeAll(tasks);
        executor.shutdown();
        executor.awaitTermination(24, TimeUnit.HOURS);

        while (results.size() > 0)
        {
            for (Iterator<Future<String>> iterator = results.iterator(); iterator.hasNext();)
            {
                Future<String> item = iterator.next();
                if (item.isDone())
                {
                    String itemValue;
                    try
                    {
                        itemValue = item.get();
                        if (!itemValue.equals("Bulk"))
                            logger.info("Backup completed for " + itemValue);
                    }
                    catch (InterruptedException | ExecutionException e)
                    {
                        String message = e.getCause().getMessage();
                        String objName = "Bulk";
                        if (message.contains("(") && message.contains(")"))
                            objName = message.substring(message.indexOf("(") + 1, message.indexOf(")"));
                        logger.error("Failed retrieving the task status for " + objName, e);
                    }
                    finally
                    {
                        iterator.remove();
                    }
                }
            }
        }
4

2 に答える 2

1

エグゼキューターが別のスレッドで終了するのを待つことをお勧めします

そうすれば、結果 FCFS を提供し、タイムアウトを強制することもできます。

次のようなもので簡単に実現できます

CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);

// place all the work in a function (an Anonymous Runnable in this case)
// completionService.submit(() ->{work});
// as soon as the work is submitted it is handled by another Thread

completionService.submit(() ->{
    logger.info("Submitting all tasks");
    for (Callable<String> task : tasks)
    completionService.submit(task);
    logger.info("Tasks submitted. Now checking the status.");
    int counter = tasks.size();
    for(int i = counter; counter >=1; counter--)  // Replaced the while loop
    {
        final Future<String> future = completionService.take();
        String itemValue;
        try
        {
            itemValue = future.get();
            if (!itemValue.equals("Bulk"))
                logger.info("Backup completed for " + itemValue);
        }
        catch (InterruptedException | ExecutionException e)
        {
            String message = e.getCause().getMessage();
            String objName = "Bulk";
            if (message.contains("(") && message.contains(")"))
                objName = message.substring(message.indexOf("(") + 1, message.indexOf(")"));
            logger.error("Failed retrieving the task status for " + objName, e);
        }
    }
});

// After submitting the work to another Thread
// Wait in your Main Thread, and enforce termination if needed
shutdownAndAwaitTermination(executor);

これを使用してエグゼキューターの終了 && 待機を処理します ( ExecutorsServiceから取得)

 void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(24, TimeUnit.HOURS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }
 }
于 2015-10-27T21:10:34.103 に答える
0

では、完了を監視する必要があります。では、なぜドキュメントに従って使用しないのでしょうか? https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.htmlしたがって、nタスクを の新しいインスタンスに送信し、完了するまでExecutorCompletionService待機nします。再び終了する必要はありません。同じエグゼキューターを再利用できます (通常、スレッド プール。新しいスレッドを作成すると、プールから再利用するよりもコストがかかります)。したがって、ドキュメントのコードをシナリオに適合させると、次のようになります。

 CompletionService<Result> ecs
         = new ExecutorCompletionService<String>(executor);
 for (Callable<Result> task : tasks)
     ecs.submit(task);
 logger.info("Tasks submitted. Now checking the status.");
 int n = tasks.size();
 for (int i = 0; i < n; ++i) {
     try {
       String r = ecs.take().get();
       logger.info("Backup completed for " + r);
     }
     catch(InterruptedException | ExecutionException e) {
         ...
     }
 }

また、例外メッセージを解析するのは悪い考えです。独自の例外クラスを作成してinstanceof.

完了のタイムアウトが必要な場合は、 のpoll代わりに時間パラメーターを使用しますtake

于 2015-10-27T22:28:14.330 に答える