10

私はいくつかのアプリケーションに取り組んでおり、さまざまなタスクを処理するために ThreadPoolExecutor を使用しています。ThreadPoolExecutor は、しばらくするとスタックします。より単純な環境でこれをシミュレートするために、問題をシミュレートできる単純なコードを作成しました。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyThreadPoolExecutor {
    private int poolSize = 10;
    private int maxPoolSize = 50;
    private long keepAliveTime = 10;
    private ThreadPoolExecutor threadPool = null;
    private final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(
            100000);

    public MyThreadPoolExecutor() {
        threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize,
                keepAliveTime, TimeUnit.SECONDS, queue);
        threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {

            @Override
            public void rejectedExecution(Runnable runnable,
                    ThreadPoolExecutor threadPoolExecutor) {
                System.out
                        .println("Execution rejected. Please try restarting the application.");
            }

        });
    }

    public void runTask(Runnable task) {
        threadPool.execute(task);
    }

    public void shutDown() {
        threadPool.shutdownNow();
    }
    public ThreadPoolExecutor getThreadPool() {
        return threadPool;
    }

    public void setThreadPool(ThreadPoolExecutor threadPool) {
        this.threadPool = threadPool;
    }

    public static void main(String[] args) {
        MyThreadPoolExecutor mtpe = new MyThreadPoolExecutor();
        for (int i = 0; i < 1000; i++) {
            final int j = i;
            mtpe.runTask(new Runnable() {

                @Override
                public void run() {
                    System.out.println(j);
                }

            });
        }
    }
}

このコードを数回実行してみてください。通常、コンソールに番号が出力され、すべてのスレッドが終了すると、番号が存在します。しかし、時々、すべてのタスクを終了してから終了しないことがあります。スレッド ダンプは次のとおりです。

MyThreadPoolExecutor [Java Application] 
    MyThreadPoolExecutor at localhost:2619 (Suspended)  
        Daemon System Thread [Attach Listener] (Suspended)  
        Daemon System Thread [Signal Dispatcher] (Suspended)    
        Daemon System Thread [Finalizer] (Suspended)    
            Object.wait(long) line: not available [native method]   
            ReferenceQueue<T>.remove(long) line: not available    
            ReferenceQueue<T>.remove() line: not available    
            Finalizer$FinalizerThread.run() line: not available 
        Daemon System Thread [Reference Handler] (Suspended)    
            Object.wait(long) line: not available [native method]   
            Reference$Lock(Object).wait() line: 485 
            Reference$ReferenceHandler.run() line: not available    
        Thread [pool-1-thread-1] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-2] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-3] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-4] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-6] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-8] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-5] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-10] (Suspended)   
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-9] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-7] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [DestroyJavaVM] (Suspended)  
    C:\Program Files\Java\jre1.6.0_07\bin\javaw.exe (Jun 17, 2010 10:42:33 AM)  

私の実際のアプリケーションでは、ThreadPoolExecutor スレッドがこの状態になり、応答を停止します。

よろしく、ラビ・ラオ

4

1 に答える 1

10

メソッドでは、mainを呼び出すことはありませんmtpe.shutdown()ThreadPoolExecutorは、そのcorePoolSize存続を無期限に維持しようとします。運が良ければ、複数のcorePoolSizeスレッドが生きているため、すべてのワーカー スレッドは、指定した 10 秒のタイムアウト時間後に終了できる条件付きロジック ブランチに入ります。ただし、お気付きのように、これが当てはまらない場合もあるため、executor 内のすべてのスレッドがArrayBlockingQueue.take()でブロックされ、新しいタスクを待機します。

また、 ExecutorService.shutdown()ExecutorService.shutdownNow()には大きな違いがあることに注意してください。ラッパー実装が示すように ExecutorService.shutdownNow() を呼び出すと、実行のために割り当てられていないいくつかのタスクがドロップされることがあります。

更新:私の最初の回答以来、ThreadPoolExecutor元の投稿のプログラムが終了しないように実装が変更されました。

于 2010-06-17T06:42:03.520 に答える