5

2スレッドのFixedThreadPoolExecutorServiceにラップされたCompletionServiceを使用して、いくつかのFutureタスクを送信し、送信されたタスクの数に等しいループを設定してセットアップし、completionservice.take()を使用してすべてが完了するか失敗するのを待ちます。問題は非常にまれに終了しないことがあるので(理由はわかりません)、take()メソッドをpoll(300、Timeout.SECONDS)に変更しました。これは、1つのタスクが完了するまでに5分以上かかる場合のアイデアです。ポーリングは失敗し、最終的にはループから抜け出します。すべての先物を調べて、future.cancel(true)を呼び出して、問題のあるタスクを強制的にキャンセルできます。

しかし、コードを実行してハングすると、ポーリングが5分ごとに1回継続して失敗し、それ以上タスクが実行されないことがわかります。そのため、2人のワーカーが何らかの方法でデッドロックし、終了せず、追加のタスクの開始を許可しないと想定します。タイムアウトは5分であり、実行するタスクがまだ1000あるため、ループを解除するのにかかる時間が長すぎたため、ジョブをキャンセルしました。

ですから、私がやりたいのは、5分以内に完了しなかった場合に現在のタスクを中断/強制的にキャンセルすることですが、それを行う方法がわかりません。

このコードサンプルは、Imが話していることの簡略版を示しています

import com.jthink.jaikoz.exception.JaikozException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;

public class CompletionServiceTest
{
    public static void main(final String[] args)
    {
        CompletionService<Boolean>  cs = new ExecutorCompletionService<Boolean>(Executors.newFixedThreadPool(2));
        Collection<Worker> tasks = new ArrayList<Worker>(10);
        tasks.add(new Worker(1));
        tasks.add(new Worker(2));
        tasks.add(new Worker(3));
        tasks.add(new Worker(4));
        tasks.add(new Worker(5));
        tasks.add(new Worker(6));

        List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(tasks.size());
        try
        {
            for (Callable task : tasks)
            {
                futures.add(cs.submit(task));
            }
            for (int t = 0; t < futures.size(); t++)
            {
                Future<Boolean> result = cs.poll(10, TimeUnit.SECONDS);
                if(result==null)
                {
                    System.out.println("Worker TimedOut:");
                    continue;
                }
                else
                {
                    try
                    {
                        if(result.isDone() && result.get())
                        {
                            System.out.println("Worker Completed:");
                        }
                        else
                        {
                            System.out.println("Worker Failed");
                        }
                    }
                    catch (ExecutionException ee)
                    {
                        ee.printStackTrace();
                    }
                }
            }
       }
        catch (InterruptedException ie)
        {
        }
        finally
        {
            //Cancel by interrupting any existing tasks currently running in Executor Service
            for (Future<Boolean> f : futures)
            {
                f.cancel(true);
            }
        }
        System.out.println("Done");
    }
}

class Worker implements Callable<Boolean>
{
    private int number;
    public Worker(int number)
    {
        this.number=number;
    }

    public Boolean call()
    {
        if(number==3)
        {
            try
            {
                Thread.sleep(50000);
            }
            catch(InterruptedException tie)
            {

            }
        }
        return true;
    }
}

出力

Worker Completed:
Worker Completed:
Worker Completed:
Worker Completed:
Worker Completed:
Worker TimedOut:
Done
4

3 に答える 3

5

私はそれを解決したと思います。基本的にタイムアウトが発生した場合は、将来のオブジェクトのリストを繰り返し処理して、完了していない最初のオブジェクトを見つけ、キャンセルを強制します。それほどエレガントではないようですが、機能しているようです。

プールのサイズを変更して、ソリューションをより適切に示す出力を表示しましたが、2つのスレッドプールでも機能します。

import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;

public class CompletionServiceTest
{
    public static void main(final String[] args)
    {
        CompletionService<Boolean>  cs = new ExecutorCompletionService<Boolean>(Executors.newFixedThreadPool(1));
        Collection<Worker> tasks = new ArrayList<Worker>(10);
        tasks.add(new Worker(1));
        tasks.add(new Worker(2));
        tasks.add(new Worker(3));
        tasks.add(new Worker(4));
        tasks.add(new Worker(5));
        tasks.add(new Worker(6));

        List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(tasks.size());
        try
        {
            for (Callable task : tasks)
            {
                futures.add(cs.submit(task));
            }
            for (int t = 0; t < futures.size(); t++)
            {
                System.out.println("Invocation:"+t);
                Future<Boolean> result = cs.poll(10, TimeUnit.SECONDS);
                if(result==null)
                {
                    System.out.println(new Date()+":Worker Timedout:");
                    //So lets cancel the first futures we find that havent completed
                    for(Future future:futures)
                    {
                        System.out.println("Checking future");
                        if(future.isDone())
                        {
                            continue;
                        }
                        else
                        {
                            future.cancel(true);
                            System.out.println("Cancelled");
                            break;
                        }
                    }
                    continue;
                }
                else
                {
                    try
                    {
                        if(result.isDone() && !result.isCancelled() && result.get())
                        {
                            System.out.println(new Date()+":Worker Completed:");
                        }
                        else if(result.isDone() && !result.isCancelled() && !result.get())
                        {
                            System.out.println(new Date()+":Worker Failed");
                        }
                    }
                    catch (ExecutionException ee)
                    {
                        ee.printStackTrace(System.out);
                    }
                }
            }
       }
        catch (InterruptedException ie)
        {
        }
        finally
        {
            //Cancel by interrupting any existing tasks currently running in Executor Service
            for (Future<Boolean> f : futures)
            {
                f.cancel(true);
            }
        }
        System.out.println(new Date()+":Done");
    }
}

class Worker implements Callable<Boolean>
{
    private int number;
    public Worker(int number)
    {
        this.number=number;
    }

    public Boolean call()
        throws InterruptedException
    {
        try
        {
            if(number==3)
            {
                Thread.sleep(50000);
            }
        }
        catch(InterruptedException ie)
        {
            System.out.println("Worker Interuppted");
            throw ie;
        }
        return true;
    }
}

出力は

Invocation:0
Thu Mar 10 20:51:39 GMT 2011:Worker Completed:
Invocation:1
Thu Mar 10 20:51:39 GMT 2011:Worker Completed:
Invocation:2
Thu Mar 10 20:51:49 GMT 2011:Worker Timedout:
Checking future
Checking future
Checking future
Cancelled
Invocation:3
Worker Interuppted
Invocation:4
Thu Mar 10 20:51:49 GMT 2011:Worker Completed:
Invocation:5
Thu Mar 10 20:51:49 GMT 2011:Worker Completed:
Thu Mar 10 20:51:49 GMT 2011:Done
于 2011-03-10T20:55:40.747 に答える
2

ワーカーの例では、Callableは中断をサポートする呼び出しをブロックしています。実際のコードが組み込みロック(synchronizedブロック)でデッドロックしている場合、中断によってコードをキャンセルすることはできません。代わりに、明示的なロック(java.util.concurrent.Lock)を使用できます。これにより、ロックの取得を待機する時間を指定できます。スレッドがロックの待機中にタイムアウトした場合、おそらくデッドロック状態が発生したために、エラーメッセージが表示されて中止される可能性があります。

ちなみに、あなたの例では、Callable飲み込んではいけませんInterruptedExceptionInterruptedExceptionそれを渡す(再スローするか、メソッド宣言のthrows行に追加する)か、catchブロックでスレッドの中断状態をリセットする必要があります(を使用してThread.currentThread().interrupt())。

于 2011-03-10T17:15:29.657 に答える
1

いつでも呼び出すfuture.get(timeout...)
ことができますまだ終了していない場合はタイムアウト例外が返されます...その後、を呼び出すことができますfuture.cancel()

于 2012-10-01T22:34:29.773 に答える