6

ArrayBlockingQueue を使用するワーカーがいくつかあります。

すべてのワーカーはキューから 1 つのオブジェクトを取得して処理し、その結果、さらに処理するためにキューに入れられる複数のオブジェクトを取得できます。つまり、労働者=生産者+消費者です。

ワーカー:

public class Worker implements Runnable
{
    private BlockingQueue<String> processQueue = null;

    public Worker(BlockingQueue<String> processQueue)
    {
        this.processQueue = processQueue;
    }

    public void run()
    {
        try
        {
            do
            {
                String item = this.processQueue.take();
                ArrayList<String> resultItems = this.processItem(item);

                for(String resultItem : resultItems)
                {
                    this.processQueue.put(resultItem);
                }
            }
            while(true);
        }
        catch(Exception)
        {
            ...
        }
    }

    private ArrayList<String> processItem(String item) throws Exception
    {
        ...
    }
}

主要:

public class Test
{
    public static void main(String[] args) throws Exception
    {
        new Test().run();
    }

    private void run() throws Exception
    {
        BlockingQueue<String> processQueue = new ArrayBlockingQueue<>(10000);
        processQueue.put("lalala");

        Executor service = Executors.newFixedThreadPool(100);
        for(int i=0; i<100; ++i)
        {
            service.execute(new Worker(processQueue));
        }
    }
}

仕事がなくなったときにワーカーを停止する最良の方法は何ですか?

まず、私が念頭に置いているのは、キューにあるアイテムの数と現在処理中のアイテムの数を定期的に確認することです。両方がゼロの場合は、ExecutorService で「shutdownNow()」などを実行します。しかし、これが最善の方法であるかどうかはわかりません。

4

3 に答える 3

2

やるべき仕事がこれ以上ない場合は、その旨を伝えるメッセージをキューに入れ、ワーカーが自分の都合でシャットダウンするようにします。これは、データの破損を防ぐ良い方法です。

すべてのワーカーが帰宅したことを別のスレッドに通知する必要がある場合は、 a を使用しCountDownLatchてそうすることができます。

于 2012-04-02T21:08:27.200 に答える
1

あなたがあなたの解決策を持っているように聞こえます-別の進行中のキューを使用してください。そのサイズは現在処理されているアイテムの数になります。いずれかのキューへのアクセスがsynchronized(theArrayBlockingQueue)ブロック単位であるという規則を使用する場合は、すべてがうまくいくはずです。特に、アイテムを処理状態に移動するときは、そのアイテムをArrayBlockingQueueから削除し、同じ同期ブロック内のprocessingQueueに追加します。

于 2012-04-02T21:16:42.620 に答える
0

私はあなたのコードを少し修正しました、それがあなたが期待するものであるかどうかはわかりませんが、少なくともそれは終了します!shutdownNow代わりにを使用するとshutdown、ワーカーが中断され、ワー​​カーを仕事に戻さない限り、キューが空であるという保証なしに終了します。

public class Test {

    public static void main(String[] args) throws Exception {
        new Test().run();
    }

    private void run() throws Exception {
        BlockingQueue<String> processQueue = new ArrayBlockingQueue<>(10000);
        processQueue.put("lalalalalalalalalalalalala"); //a little longer to make sure there is enough to process

        ExecutorService service = Executors.newFixedThreadPool(100);
        for (int i = 0; i < 100; ++i) {
            service.execute(new Worker(processQueue));
        }
        service.shutdown(); //orderly shutdown = lets the tasks terminate what they are doing
        service.awaitTermination(1, TimeUnit.SECONDS); //blocks until all tasks have finished or throws TimeOutException if timeout is reached
    }

    public static class Worker implements Runnable {

        private BlockingQueue<String> processQueue = null;
        private int count = 0;

        public Worker(BlockingQueue<String> processQueue) {
            this.processQueue = processQueue;
        }

        @Override
        public void run() {
            try {
                do {
                    //tries to get something from the queue for 100ms and returns null if it could not get anything
                    String item = this.processQueue.poll(100, TimeUnit.MILLISECONDS);
                    if (item == null) break; //Ends the job because the queue was empty
                    count++;
                    List<String> resultItems = this.processItem(item);

                    for (String resultItem : resultItems) {
                        this.processQueue.put(resultItem);
                    }
                } while (true);
            } catch (InterruptedException e) {
                System.out.println("Interrupted");
                Thread.currentThread().interrupt();
            }
            if (count != 0) System.out.println(Thread.currentThread() + ": processed " + count + " entries");
        }

        private List<String> processItem(String item) { //let's put the string back less final character
            if (item.isEmpty()) {
                return Collections.<String> emptyList();
            } else {
                return Arrays.asList(item.substring(0, item.length() - 1));
            }
        }
    }
}
于 2012-04-04T08:55:33.393 に答える