7

生産者/消費者パターンについての私の理解は、生産者と消費者の間で共有されるキューを使用して実装できるということです。プロデューサーは作業を共有キューに送信し、コンシューマーはそれを取得して処理します。また、プロデューサーがコンシューマーに直接送信することによって実装することもできます(プロデューサースレッドがコンシューマーのエグゼキューターサービスに直接送信します)。 

今、私はスレッドプールのいくつかの一般的な実装を提供するExecutorsクラスを見てきました。仕様によると、メソッドnewFixedThreadPoolは、「共有された無制限のキューで動作する固定数のスレッドを再利用します」。彼らはここでどのキューについて話しているのですか? 

プロデューサーがタスクをコンシューマーに直接送信する場合、ランナブルのリストを含むのはExecutorServiceの内部キューですか?

それとも、プロデューサーが共有キューに送信する場合の中間キューですか? 

私は全体のポイントを逃しているかもしれませんが、誰かが明確にしてください?

4

3 に答える 3

4

あなたは正しいです、ExecutorServiceそれはスレッドプールであるだけでなく、完全な生産者/消費者実装です。この内部キューは、実際には、タスクを保持しているRunnable(正確には)スレッドセーフなキューです。FutureTasksubmit()

プール内のすべてのスレッドはそのキューでブロックされ、タスクが実行されるのを待ちます。タスクを実行submit()すると、1つのスレッドがそれを取得して実行します。もちろんsubmit()、プール内のスレッドが処理を終了するのを待っているわけではありません。

一方、膨大な数のタスク(または長時間実行されるタスク)を送信すると、プール内のすべてのスレッドが占有され、一部のタスクがキューで待機することになります。スレッドのタスクが完了すると、すぐにキューから最初のスレッドが選択されます。

于 2011-08-11T20:16:55.077 に答える
1
public class Producer extends Thread {  
    static List<String> list = new ArrayList<String>();  

    public static void main(String[] args) {  
        ScheduledExecutorService executor = Executors  
                .newScheduledThreadPool(12);  
        int initialDelay = 5;  
        int pollingFrequency = 5;  
        Producer producer = new Producer();  
        @SuppressWarnings({ "rawtypes", "unused" })  
        ScheduledFuture schedFutureProducer = executor.scheduleWithFixedDelay(  
                producer, initialDelay, pollingFrequency, TimeUnit.SECONDS);  
        for (int i = 0; i < 3; i++) {  
            Consumer consumer = new Consumer();  
            @SuppressWarnings({ "rawtypes", "unused" })  
            ScheduledFuture schedFutureConsumer = executor  
                    .scheduleWithFixedDelay(consumer, initialDelay,  
                            pollingFrequency, TimeUnit.SECONDS);  
        }  

    }  

    @Override  
    public void run() {  
        list.add("object added to list is " + System.currentTimeMillis());  
                              ///adding in list become slow also because of synchronized behavior  
    }  
}  

class Consumer extends Thread {  

    @Override  
    public void run() {  
        getObjectFromList();  
    }  

    private void getObjectFromList() {  
        synchronized (Producer.list) {  
            if (Producer.list.size() > 0) {  
                System.out.println("Object name removed by "  
                        + Thread.currentThread().getName() + "is "  
                        + Producer.list.get(0));  
                Producer.list.remove(Producer.list.get(0));  
            }  
        }  
    }  
}  
于 2012-06-12T09:59:22.860 に答える
0

これをチェックしてください:
Javaでの生産者/消費者の例(RabbitMQ)(これは別のライブラリ用に書かれていますが、Javaであり、概念を明確に示しています;)お
役に立てば幸いです。

PS:実際には、いくつかの例がありますが、あなたはアイデアを得る;)

于 2011-08-11T22:47:03.363 に答える