0

Active Objects を使用して Thread-Pool の簡単な実装を作成しようとしています。

これが私のメインです:

public static void main(String[] args){
   MyThreadPool tp = new MyThreadPool(100,3);
        tp.execute(()->{
            try { Thread.sleep(5*1000); } catch (InterruptedException e) {}
            System.out.println("42");
        });
        tp.shutDown();
}

通常、shutDown メソッドはメインを介して最初に呼び出されるため、アクティブ オブジェクトが不必要に「生きている」状態に保たれますが、必要な結果が得られることもあります。結果について不確実性がある理由は何ですか?

以下に、残りのクラスを示します

public class MyThreadPool {

    ArrayBlockingQueue<Runnable> q;
    ArrayBlockingQueue<ActiveObject> activeObjects;
    volatile boolean stop;
    AtomicInteger count;
    Thread t;
    Runnable stopTask;

    public MyThreadPool(int capacity, int maxThreads) {
        activeObjects = new ArrayBlockingQueue<>(maxThreads);
        q = new ArrayBlockingQueue<>(capacity);
        count = new AtomicInteger(0);
        stopTask = ()->stop = true;

        t=new Thread(()->{
            //System.out.println("Thread-Pool Started");
            while(!stop){
                //if queue is empty it is gonna be a blocking call
                try {
                    Runnable task = q.take();
                    if(task==stopTask)
                        stopTask.run();
                    else
                        //size() is atomic integer
                        if (count.get() < maxThreads) {
                            ActiveObject a = new ActiveObject(capacity);
                            activeObjects.put(a);
                            count.incrementAndGet();
                            a.execute(task);
                        }
                        //we will assign the next task to the least busy ActiveObject
                        else {
                            int minSize = Integer.MAX_VALUE;
                            ActiveObject choice = null;
                            for (ActiveObject a : activeObjects) {
                                if (a.size() < minSize) {
                                    minSize = a.size();
                                    choice = a;
                                }
                            }
                            choice.execute(task);
                        }

                } catch (InterruptedException e) { }
            }
            //System.out.println("Thread-Pool Ended");
        });
       t.start();
    }

    //execute returns right away - just puts into the queue
    public void execute(Runnable r ){
        // if capacity is full it is gonna be a blocking call
        if(!stop)
            try { q.put(r); } catch (InterruptedException e) { }
    }

    public void shutDownNow(){
        activeObjects.forEach(a->a.shutDownNow());
        stop = true;
        t.interrupt();
    }
    public void shutDown(){
        activeObjects.forEach(a->a.shutDown());
        execute(stopTask);
    }
public class ActiveObject {

    ArrayBlockingQueue<Runnable> q;
    volatile boolean stop;
    Thread t;

    public ActiveObject(int capacity) {
        q = new ArrayBlockingQueue<>(capacity);
        t=new Thread(()->{
            //System.out.println("Active Object Started");
            while(!stop){
                //if queue is empty it is gonna be a blocking call
                try {
                    q.take().run();
                } catch (InterruptedException e) { }
            }
            //System.out.println("Active Object Ended");
        });

       t.start();
    }

    //execute returns right away - just puts into the queue
    public void execute(Runnable r ){
        // if capacity is full it is gonna be a blocking call
        if(!stop)
            try { q.put(r); } catch (InterruptedException e) { }
    }

    public void shutDownNow(){
        stop = true;
        t.interrupt();
    }
    public void shutDown(){
        execute(()->stop=true);
    }

    public int size(){
        return q.size();
    }
}
4

1 に答える 1