1

私はJavaのスレッドに関してプロデューサーとコンシューマーのデザインパターンを調査していましたが、最近Java5で調査しました。Java5でのBlockingQueueデータ構造の導入により、BlockingQueueはブロッキングメソッドを導入することでこの制御を暗黙的に提供するため、はるかに簡単になりました。 put()およびtake()。これで、プロデューサーとコンシューマーの間で通信するために待機と通知を使用する必要がなくなりました。BlockingQueue put()メソッドは、制限付きキューの場合にキューがいっぱいの場合にブロックし、キューが空の場合にtake()がブロックします。次のセクションでは、プロデューサーコンシューマーデザインパターンのコード例を示します。私は以下のプログラムを開発しましたが、waut()とnotify()の古いスタイルのアプローチも教えてください。古いスタイルのアプローチでも同じロジックを開発したいと思います。

皆さんは、これをどのように実装できるかをアドバイスしてください。従来の方法では、wait()メソッドとnotify()メソッドを使用して、プロデューサースレッドとコンシューマースレッドの間で通信し、フルキューや空のキューなどの個別の条件でそれぞれをブロックします...?

    import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ProducerConsumerPattern {

    public static void main(String args[]){

     //Creating shared object
     BlockingQueue sharedQueue = new LinkedBlockingQueue();

     //Creating Producer and Consumer Thread
     Thread prodThread = new Thread(new Producer(sharedQueue));
     Thread consThread = new Thread(new Consumer(sharedQueue));

     //Starting producer and Consumer thread
     prodThread.start();
     consThread.start();
    }

}

//Producer Class in java
class Producer implements Runnable {

    private final BlockingQueue sharedQueue;

    public Producer(BlockingQueue sharedQueue) {
        this.sharedQueue = sharedQueue;
    }

    @Override
    public void run() {
        for(int i=0; i<10; i++){
            try {
                System.out.println("Produced: " + i);
                sharedQueue.put(i);
            } catch (InterruptedException ex) {
                Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

}

//Consumer Class in Java
class Consumer implements Runnable{

    private final BlockingQueue sharedQueue;

    public Consumer (BlockingQueue sharedQueue) {
        this.sharedQueue = sharedQueue;
    }

    @Override
    public void run() {
        while(true){
            try {
                System.out.println("Consumed: "+ sharedQueue.take());
            } catch (InterruptedException ex) {
                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }


}

Output:
Produced: 0
Produced: 1
Consumed: 0
Produced: 2
Consumed: 1
Produced: 3
Consumed: 2
Produced: 4
Consumed: 3
Produced: 5
Consumed: 4
Produced: 6
Consumed: 5
Produced: 7
Consumed: 6
Produced: 8
Consumed: 7
Produced: 9
Consumed: 8
Consumed: 9
4

3 に答える 3

5

これを行う別の方法を知りたい場合は、 ExecutorService を使用してみてください

public static void main(String... args) {
    ExecutorService service = Executors.newSingleThreadExecutor();
    for (int i = 0; i < 100; i++) {
        System.out.println("Produced: " + i);

        final int finalI = i;
        service.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println("Consumed: " + finalI);
            }
        });
    }
    service.shutdown();
}

わずか 10 個のタスクで、コンシューマーが開始する前にプロデューサーを終了できます。100 個のタスクを試すと、それらがインターリーブされていることがわかります。

于 2012-08-15T08:43:30.797 に答える
2

教育目的で BlockingQueue の仕組みを理解したい場合は、いつでもそのソース コードを確認できます。

最も簡単な方法はsynchronize、メソッドoffer()とメソッドを使用することで、キューがいっぱいになり、誰かが要素を呼び出そtake()うとする場合です。誰かが要素を取得しているとき、眠っているスレッド。(空のキューから しようとするときと同じ考え)。すべての呼び出しが、スレッドが起動されるたびに条件が満たされているかどうかを確認するループにネストされていることを確認してください。offer()wait()notify()take()
wait()

製品の目的でゼロから実装することを計画している場合は、強く反対します。可能な限り、既存のテスト済みのライブラリとコンポーネントを使用する必要があります。

于 2012-08-15T08:36:24.737 に答える
1

私はこの待機を行うことができます-睡眠中に通知する(または少なくとも私はできると思います)。Java 1.4ソースは、これらすべての美しい例を提供しましたが、それらはすべてをアトミックで実行するように切り替えられており、現在ははるかに複雑になっています。wait-notifyは柔軟性とパワーを提供しますが、他の方法は同時実行の危険からあなたを保護し、より単純なコードを作ることができます。

これを行うには、次のようないくつかのフィールドが必要です。

private final ConcurrentLinkedQueue<Intger>  sharedQueue =
                                                    new ConcurrentLinkedQueue<>();
private volatile   boolean  waitFlag = true;

Producer.runは次のようになります。

public void run()  {
    for (int i = 0; i < 100000, i++)  {
        System.out.println( "Produced: " + i );
        sharedQueue.add( new Integer( i ) );
        if (waitFlag)       // volatile access is cheaper than synch.
            synchronized (sharedQueue)  { sharedQueue.notifyAll(); }
    }
}

そしてConsumer.run:

public void run()  {
    waitFlag = false;
    for (;;)  {
        Integer  ic = sharedQueue.poll();
        if (ic == null)  {
            synchronized (sharedQueue)  {
                waitFlag = true;
                // An add might have come through before waitFlag was set.
                ic = sharedQueue.poll();
                if (ic == null)  {
                    try  { sharedQueue.wait(); }
                    catch (InterruptedException ex)  {}
                    waitFlag = false;
                    continue;
                }
                waitFlag = true;
            }
        }
        System.out.println( "Consumed: " + ic );
    }
}

これにより、同期が最小限に抑えられます。すべてがうまくいけば、追加ごとに揮発性フィールドを1回だけ確認できます。任意の数のプロデューサーを同時に実行できる必要があります。(消費者の方が難しいでしょう。あきらめる必要がありますwaitFlag。)wait/notifyAllに別のオブジェクトを使用できます。

于 2012-08-15T15:32:00.123 に答える