2

次の SupervisedExecutor および ExecutorSuperviser の独自の実装を使用すると、パフォーマンスが低下するように思えます。このコードでは何が非効率的だと思いますか? どうすればその効率を改善できるかを知りたいです。

ExecutorSupervisor クラス:

public class ExecutorSuperviser {
private SupervisedExecutor[] threadPool;
private int poolSize = 0;
private LinkedList<Runnable> q;\\my own implementation of linkedlist
public ExecutorSuperviser(int nThreads) {
    threadPool=new SupervisedExecutor[poolSize=nThreads];
    q=new LinkedList<Runnable>();
    init();
}
public void execute(Runnable r) { 
    synchronized (q) {
        q.addToTail(r);
    }
    for (int i=0;i<poolSize;i++) 
            if (!threadPool[i].isBusy()) {
                if (!threadPool[i].isAlive()) threadPool[i].start();
                threadPool[i].interrupt();
                return;
            }


}
private void init() {
    for (int i=0;i<poolSize;i++) {
        threadPool[i]=new SupervisedExecutor(this);
    }

}
public Object getLock() {
    return q;
}
public Runnable getTask() {
    return q.removeHead();
}
public void terminate() {
    for (int i=0;i<poolSize;i++) 
        threadPool[i].terminate();
}
public void waitUntilFinished() {
    while (!isFinished()) {
        try {
            Thread.sleep(Thread.MAX_PRIORITY);
        } catch (InterruptedException e) {}
    }
}
private boolean isFinished() {
    for (int i=0;i<poolSize;i++) 
        if (threadPool[i].isBusy()) return false;
    return q.isEmpty();
}

}

SupervisedExecutor クラス:

public class SupervisedExecutor extends Thread {
    private boolean   terminated = false;
    private Boolean busy = false;
    private ExecutorSuperviser boss;
    SupervisedExecutor (ExecutorSuperviser boss) {
        this.boss=boss;
    }
    public void run() {
        while (!terminated) {
            try {
                sleep(MAX_PRIORITY);
            } catch (InterruptedException e) {
                synchronized (busy) {
                    busy=true;
                }
                Runnable r;
                while (true) {
                    synchronized (boss.getLock()) {
                        r=boss.getTask();
                    }
                    if (r!=null) r.run();
                    else break;
                } 
                synchronized (busy) {
                    busy=false;
                }
            }
        }
    }

    public boolean isBusy() {
        boolean isBusy;
        synchronized (boss.getLock()) {
            isBusy=busy;
        }
        return isBusy;
    }
    public void terminate() {
        terminated=true;
    }

}
4

2 に答える 2

1

次の利点を持つ次のソリューションはどうですか。

  1. ThreadPoolExecutorのサブクラスとして、ThreadPoolExecutor目的の機能を取得するためだけにすべてを再実装する必要はありませんwaitUntilFinished()

  2. ReentrantLockCondition、およびawait()/を利用することでsignal()、確実にパフォーマンスを低下させる可能性のあるビジー状態の待機を回避できます。

この実装は、アクティブなタスクの独自の数を保持するために公開するbeforeExecute()およびafterExecute()メソッドを利用することによって機能します。ThreadPoolExecutorJavaDocによると、正確な答えを保証しないため、使用getActiveCount()しません(おそらく正確な答えが得られる場合は、ThreadPoolExecutorさらに調査する必要があります)。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class WaitableThreadPoolExecutor extends ThreadPoolExecutor
{
    private Condition waitCondition;
    private ReentrantLock lock;
    private int taskCount = 0;

    public WaitableThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue )
    {
        super( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue );

        lock = new ReentrantLock( );
        waitCondition = lock.newCondition( );
    }

    // if isEmpty() is true, then there is no need to block
    // otherwise, wait until waitCondition is signaled
    public void waitUntilFinished( )
    {
        lock.lock( );
        try
        {
            while ( !isEmpty( ) )
                waitCondition.await( );
        }
        catch ( InterruptedException e )
        {
            e.printStackTrace();
        }
        finally
        {
            lock.unlock( );
        }
    }

    // the ThreadPool is empty if our taskCount is 0 and the
    // work queue is empty (this may not be bullet-proof, for one
    // thing, I'm hesitant to use getActiveCount() because it
    // does not guarantee an exact answer
    protected boolean isEmpty( )
    {
        lock.lock( );
        try
        {
            return taskCount == 0 && getQueue( ).isEmpty( );
        }
        finally
        {
            lock.unlock( );
        }
    }

    // increment our task count before executing each task
    @Override
    protected void beforeExecute( Thread t, Runnable r )
    {
        super.beforeExecute( t, r );

        lock.lock( );
        try
        {
            taskCount += 1;
        }
        finally
        {
            lock.unlock( );
        }
    }

    // decrement our task count after executing each task
    // then, if the pool is empty, signal anyone waiting
    // on the waitCondition
    @Override
    protected void afterExecute( Runnable r, Throwable t )
    {
        super.afterExecute( r, t );

        lock.lock( );
        try
        {
            taskCount -= 1;

            if ( isEmpty( ) ) waitCondition.signalAll( );
        }
        finally
        {
            lock.unlock( );
        }
    }

    public static void main( String[] args )
    {
        WaitableThreadPoolExecutor pool = new WaitableThreadPoolExecutor( 2, 4, 5000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>( ) );

        for ( int i = 0 ; i < 10 ; i++ )
        {
            final int threadId = i;

            pool.execute( new Runnable( )
            {
                @Override
                public void run( )
                {
                    try { Thread.sleep( (int) ( Math.random( ) * 5000 ) ); } catch ( InterruptedException e ) { }

                    System.out.println( threadId + " done." );
                }
            });
        }

        pool.waitUntilFinished( );

        System.out.println( "Done waiting." );
    }
}

main()テストケースとして使用できる簡単な方法を含めました。完了したことを出力する前にランダムな時間待機する 10 個のスレッドを開始します。次に、メイン スレッドが を呼び出しますwaitUntilFinished()

結果は次のようになります (主なポイントは、Done waiting.常に最後に出力されることです:

1 done.
2 done.
0 done.
4 done.
3 done.
5 done.
7 done.
8 done.
6 done.
9 done.
Done waiting.
于 2012-04-19T23:20:47.947 に答える
0

個人的には、プレーンな ExecutorService を使用すると、より短くて簡単になります。

注: 必要なコードはこれだけです。

ExecutorService es = Executors.newCachedThreadPool();

List<Future<Void>>futures = new ArrayList<Future<Void>>();
for (int i = 0; i < 10; i++) {
    final int threadId = i;
    futures.add(es.submit(new Callable<Void>() {
        @Override
        public Void call() throws InterruptedException {
            Thread.sleep((int) (Math.random() * 1000));
            System.out.println(threadId + " done.");
            return null;
        }
    }));
}
for (Future<Void> future : futures)
    future.get();
System.out.println("Done waiting.");

es.shutdown();

版画

2 done.
4 done.
7 done.
6 done.
8 done.
5 done.
9 done.
1 done.
3 done.
0 done.
Done waiting.
于 2012-04-20T07:21:57.867 に答える