1

非常に基本的なスレッドプール コードがあります。これは、linkedblockingqueue 内に格納されたワーカー オブジェクトのプールを呼び出します。このコードは、ワーカー オブジェクトを再利用して入力データを出力するだけです。

次のような一貫したデッドロック/フリーズが見つかりました。

public class throttleheapthreadpool{
            private quoteworkerobject[] channels;
            private LinkedBlockingQueue<quoteworkerobject> idlechannels;
            public throttleheapthreadpool(int poolsize,int stocks){
        channels=new quoteworkerobject[poolsize];
        idlechannels=new LinkedBlockingQueue<quoteworkerobject>();

        for(int i=1;i<poolsize;i++){
            channels[i]=new quoteworkerobject(idlechannels);
            idlechannels.add(channels[i]);//All WORKERS to Idle pool to start
        }
    }

    public void execute(Integer quote){
        quoteworkerobject current = null;
        try {
                        //extract worker from pool
            current = (quoteworkerobject)idlechannels.take();
            current.put(quote);
        } catch (InterruptedException e) {
        }
    }

    class quoteworkerobject{
        LinkedBlockingQueue<Integer> taskqueue=new LinkedBlockingQueue<Integer>(); 
        Thread quotethread=null;
        LinkedBlockingQueue<quoteworkerobject> idle=null;
        @SuppressWarnings("unchecked")
        public quoteworkerobject(LinkedBlockingQueue<quoteworkerobject> idlechannels){
            this.idle=idlechannels;
            Runnable r=new Runnable(){
                public void run() {
                    insertquote();
                }
            };
            quotethread=new Thread(r);
            quotethread.start();//spawn a thread from the worker 
        }
        public void put(Integer  quote){
            taskqueue.add(quote);
        }
        public void insertquote(){
            try{
                Integer thisquote=taskqueue.take();
                idle.add(this);
            }
            catch(Exception ex){
            }

        }

    }

    public static void main(String[] args){
        throttleheapthreadpool pool=new throttleheapthreadpool(5,200);
        Random randomGenerator = new Random();

        for(int node=0;node < 20;node++){
            int d=randomGenerator.nextInt(5*200);
            pool.execute(d);
        }
    }

}   

このコードは一貫して 8 回目の実行でフリーズします - その時点で current = (quoteworkerobject)idlechannels.take();

上記の何が間違っていますか?

4

2 に答える 2

3

これがまさに私が(嫌いですか?)そのようなコードを使いたくない理由です。あなたはあなた/私たちの生活を楽にし、数ヶ月後でもそれを見て証明できるコードを書くことを検討する必要があります:それに応じて変数に名前を付け、短いドキュメントや説明を書くなど。何が起こっているのか理解できなかったため、リファクタリングに25分かかりました。

小さなリファクタリングを追加し、いくつかのブレークポイントも追加しました。コードを見てください。説明は内部にあります。しかし、問題はinsertQuoteメソッドにあります-それは早すぎます。

import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;

public class Pool {
private Worker[] workers;
private LinkedBlockingQueue<Worker> workerQueue;



/**
 * Create a pool of 5 workers and a {@link LinkedBlockingQueue} to store them
 */
public Pool(int poolsize) {
    //1. First you get here : you create a Pool of 5 Worker Threads and a Queue to store them
    System.out.println("1.");
    workers = new Worker[poolsize];
    workerQueue = new LinkedBlockingQueue<Worker>();

    for (int i = 0; i < poolsize; i++) {
        //2. You instantiate 5 worker Threads and place each of them on the Queue
        System.out.println("2.");
        workers[i] = new Worker(workerQueue);
        workerQueue.add(workers[i]);
    }
}

public void execute(Integer quote) {
    Worker current = null;
    try {
        // extract worker from pool
        //6. Get a worker from the Queue
        System.out.println("6.");
        current = workerQueue.take();
        current.put(quote);
    } catch (InterruptedException e) {
    }
}


/**
 * 
 *
 */
class Worker {
    LinkedBlockingQueue<Integer> taskqueueForEachWorker = new LinkedBlockingQueue<Integer>();
    LinkedBlockingQueue<Worker> workerQueue = null;

    public Worker(LinkedBlockingQueue<Worker> idlechannels) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                //3. You call the insert quote method
                System.out.println("3.");
                insertquote();
            }
        }).start();
    }

    public void put(Integer quote) {
        //7. Add a task for each Thread to do
        System.out.println("7.");
        taskqueueForEachWorker.add(quote);
    }

    //TODO The problem is here: After you execute this line : workerQueue.add(this); this method ends, NO MORE worker Threads are put on the queue,
    // thus at point 6 you block, well because there are no more worker Threads an no one add them.

    public void insertquote() {
        try {
            // 4. You try to take an Integer from the Pool of tasks from rach Thread, but there is nothing yet - it is empty, thus each Thread (worker)
            // blocks here, waiting for a task
            System.out.println("4.");
            Integer thisquote = taskqueueForEachWorker.take(); // This will successed only after 7.
            workerQueue.add(this);
        } catch (Exception ex) {
        }
    }
}

public static void main(String[] args) {
    Pool pool = new Pool(5);
    Random randomGenerator = new Random();

    for (int node = 0; node < 20; node++) {
        int d = randomGenerator.nextInt(5 * 200);
        System.out.println("5.");
        pool.execute(d);
    }
}

}

出力は1.2.3. 4. 2. 3. 4. 2. 3. 4. 2. 3. 4. 2. 3. 4. 5. 6. 7. 5.6.7になります。 5. 6. 7. 5. 6. 7. 5. 6. 7.5.6。

最後の行が6であることを確認してください。メソッドinsertQuoteが終了し、キューが空になったためにここでブロックされた場合、すべてのワーカースレッドが取得されています。

また、ワーカースレッドはそれぞれ個別のキューを使用しているため、「ワークスティーリング」パターンまたはDequeを実装する必要があるように思われます。それも調べてください。

于 2012-08-07T08:40:18.563 に答える
1

以下の私のリファクタリングを参照してください(まだ完璧ではありませんが、少し読みやすくなっています)。あなたの問題は次のとおりです。

  • 4つのワーカーを作成します(のコンストラクターで1から5までループしますthrottleheapthreadpool
  • 各ワーカーは別々のスレッドでinsertquote 1回実行され、アイドルプールに戻ります

insertquoteしたがって、全体として、完了した4つのジョブを送信し、ワーカーはキューに戻ります。その後、メソッドが終了したためにジョブを消費しないことを除いて、さらに4つのジョブ(合計8つ)を与えます。

insertquote解決策: whileループで実行します。

public void insertquote() {
    try {
        while (true) {
            taskqueue.take();
            idle.add(this);
        }
    } catch (Exception ex) {
    }
}

詳細については、コードの現在のバージョンを次に示します。

public class ThrottleHeapThreadPool {

    private final BlockingQueue<QuoteWorkerObject> idlechannels = new LinkedBlockingQueue<QuoteWorkerObject>();

    public static void main(String[] args) {
        ThrottleHeapThreadPool pool = new ThrottleHeapThreadPool(5, 200);
        Random randomGenerator = new Random();

        for (int node = 0; node < 20; node++) {
            int d = randomGenerator.nextInt(5 * 200);
            pool.execute(d);
        }
    }

    public ThrottleHeapThreadPool(int poolsize, int stocks) {

        for (int i = 1; i < poolsize; i++) {
            QuoteWorkerObject worker = new QuoteWorkerObject(idlechannels);
            idlechannels.add(worker);//All WORKERS to Idle pool to start
            worker.init();
        }
    }

    public void execute(Integer quote) {
        try {
            //extract worker from pool
            QuoteWorkerObject worker = idlechannels.take();
            worker.put(quote);
        } catch (InterruptedException e) {
        }
    }

    class QuoteWorkerObject {

        private final BlockingQueue<Integer> taskqueue = new LinkedBlockingQueue<Integer>();
        private final BlockingQueue<QuoteWorkerObject> idle;

        @SuppressWarnings("unchecked")
        public QuoteWorkerObject(BlockingQueue<QuoteWorkerObject> idlechannels) {
            this.idle = idlechannels;
        }

        public void init() {
            new Thread(new Runnable() {
                public void run() {
                    insertquote();
                }
            }).start();
        }

        public void put(Integer quote) {
            taskqueue.add(quote);
        }

        public void insertquote() {
            try {
                while (true) {
                    taskqueue.take();
                    idle.add(this);
                }
            } catch (Exception ex) {
            }
        }
    }
}
于 2012-08-06T17:39:00.033 に答える