199

完全な単純なシナリオ、つまり、特にキューでこれをどのように使用するかを提案するチュートリアルを入手できますか?

4

7 に答える 7

283

およびメソッドはwait()notify()特定の条件が満たされるまでスレッドをブロックできるようにするメカニズムを提供するように設計されています。このために、要素の固定サイズのバッキングストアがあるブロッキングキューの実装を記述したいとします。

最初に行う必要があるのは、メソッドが待機する条件を特定することです。この場合、put()ストアに空き領域ができるまでtake()メソッドをブロックし、返す要素ができるまでメソッドをブロックする必要があります。

public class BlockingQueue<T> {

    private Queue<T> queue = new LinkedList<T>();
    private int capacity;

    public BlockingQueue(int capacity) {
        this.capacity = capacity;
    }

    public synchronized void put(T element) throws InterruptedException {
        while(queue.size() == capacity) {
            wait();
        }

        queue.add(element);
        notify(); // notifyAll() for multiple producer/consumer threads
    }

    public synchronized T take() throws InterruptedException {
        while(queue.isEmpty()) {
            wait();
        }

        T item = queue.remove();
        notify(); // notifyAll() for multiple producer/consumer threads
        return item;
    }
}

待機および通知メカニズムを使用する方法について注意すべき点がいくつかあります。

wait()まず、コードの同期領域への呼び出しまたはnotify()同期領域内にあることを確認する必要があります(wait()とのnotify()呼び出しは同じオブジェクトで同期されます)。この理由(標準のスレッドセーフの懸念以外)は、信号の欠落と呼ばれるものが原因です。

この例として、キューがいっぱいになったときにスレッドが呼び出すput()場合があります。次に、条件をチェックし、キューがいっぱいであることを確認しますが、別のスレッドをブロックする前にスケジュールされます。次に、この2番目のスレッドtake()はキューの要素であり、キューがいっぱいではなくなったことを待機中のスレッドに通知します。ただし、最初のスレッドはすでに条件をチェックしているため、wait()進行する可能性がある場合でも、スケジュールが変更された後に呼び出すだけです。

take()共有オブジェクトで同期することにより、最初のスレッドが実際にブロックされるまで2番目のスレッドの呼び出しを進めることができないため、この問題が発生しないようにすることができます。

次に、スプ​​リアスウェイクアップと呼ばれる問題があるため、チェックしている条件をifステートメントではなくwhileループに入れる必要があります。notify()これは、待機中のスレッドが呼び出されずに再アクティブ化される場合がある場所です。このチェックをwhileループに入れると、誤ったウェイクアップが発生した場合に条件が再チェックされ、スレッドがwait()再度呼び出されます。


他の回答のいくつかが述べているように、Java 1.5はjava.util.concurrent、待機/通知メカニズムに対してより高いレベルの抽象化を提供するように設計された新しい同時実行ライブラリ(パッケージ内)を導入しました。これらの新機能を使用して、元の例を次のように書き直すことができます。

public class BlockingQueue<T> {

    private Queue<T> queue = new LinkedList<T>();
    private int capacity;
    private Lock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();

    public BlockingQueue(int capacity) {
        this.capacity = capacity;
    }

    public void put(T element) throws InterruptedException {
        lock.lock();
        try {
            while(queue.size() == capacity) {
                notFull.await();
            }

            queue.add(element);
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public T take() throws InterruptedException {
        lock.lock();
        try {
            while(queue.isEmpty()) {
                notEmpty.await();
            }

            T item = queue.remove();
            notFull.signal();
            return item;
        } finally {
            lock.unlock();
        }
    }
}

もちろん、実際にブロッキングキューが必要な場合は、 BlockingQueueインターフェイスの実装を使用する必要があります。

また、このようなものについては、並行性に関連する問題と解決策について知りたいと思う可能性のあるすべてを網羅しているため、Javaの並行性の実践を強くお勧めします。

于 2010-03-29T10:11:53.783 に答える
156

キューの例ではありませんが、非常に単純です:)

class MyHouse {
    private boolean pizzaArrived = false;

    public void eatPizza(){
        synchronized(this){
            while(!pizzaArrived){
                wait();
            }
        }
        System.out.println("yumyum..");
    }

    public void pizzaGuy(){
        synchronized(this){
             this.pizzaArrived = true;
             notifyAll();
        }
    }
}

いくつかの重要なポイント:
1)絶対にしないでください

 if(!pizzaArrived){
     wait();
 }

常にwhile(condition)を使用してください。

  • a)スレッドは、誰からも通知されることなく、待機状態から散発的に目覚める可能性があります。(ピザ屋がチャイムを鳴らさなかった場合でも、誰かがピザを食べてみることにしました。)
  • b)同期ロックを取得した後、状態を再度確認する必要があります。ピザが永遠に続くわけではないとしましょう。あなたは目を覚まし、ピザのラインナップを用意しましたが、それだけでは誰にとっても十分ではありません。チェックしないと紙を食べるかも!:)(おそらくより良い例は while(!pizzaExists){ wait(); }

2)wait / nofityを呼び出す前に、ロックを保持(同期)する必要があります。スレッドは、ウェイクアップする前にロックを取得する必要もあります。

3)同期されたブロック内でロックを取得しないようにし、エイリアンメソッド(それらが何をしているのかわからないメソッド)を呼び出さないようにします。必要な場合は、デッドロックを回避するための対策を講じてください。

4)notify()に注意してください。何をしているのかがわかるまで、notifyAll()を使い続けます。

5)最後になりましたが、実際のJava同時実行性をお読みください。

于 2010-03-29T09:51:04.523 に答える
42

wait()あなたが具体的に求めたとしてもnotify()、私はこの引用がまだ十分に重要であると感じています:

Josh Bloch、Effective Java 2nd Edition、Item 69:並行性ユーティリティを優先しwaitnotify(彼を強調):

正しく使用することの難しさを考えると、wait代わりnotifyに高水準の並行性ユーティリティを使用する必要があります[...]を直接使用waitするnotifyことは、によって提供される高水準言語と比較して、「並行性アセンブリ言語」でのプログラミングに似ていますjava.util.concurrent新しいコードで使用する理由は、あったとしてもめったにありませんwaitnotify

于 2010-03-29T09:16:33.890 に答える
10

このJavaチュートリアルを見たことがありますか?

さらに、実際のソフトウェアでこの種のものをいじるのは避けてください。それが何であるかを知っているようにそれで遊ぶのは良いことですが、並行性には至る所に落とし穴があります。他の人のためにソフトウェアを構築している場合は、より高いレベルの抽象化と同期されたコレクションまたはJMSキューを使用することをお勧めします。

それは少なくとも私がしていることです。私は並行性の専門家ではないので、可能な限り手動でスレッドを処理することは避けています。

于 2010-03-29T08:57:39.237 に答える
1

public class myThread extends Thread{
     @override
     public void run(){
        while(true){
           threadCondWait();// Circle waiting...
           //bla bla bla bla
        }
     }
     public synchronized void threadCondWait(){
        while(myCondition){
           wait();//Comminucate with notify()
        }
     }

}
public class myAnotherThread extends Thread{
     @override
     public void run(){
        //Bla Bla bla
        notify();//Trigger wait() Next Step
     }

}
于 2015-10-21T11:50:59.540 に答える
0

質問は、キュー(バッファ)を含むwait()+ notify()を要求します。最初に頭に浮かぶのは、バッファーを使用する生産者/消費者シナリオです。

システムの3つのコンポーネント:

  1. キュー[バッファ]-スレッド間で共有される固定サイズのキュー
  2. プロデューサー-スレッドが値を生成/バッファーに挿入します
  3. コンシューマー-スレッドはバッファーから値を消費/削除します

プロデューサースレッド:プロデューサーは、バッファーがいっぱいになるまで値をバッファーに挿入します。バッファがいっぱいになると、プロデューサはwait()を呼び出し、コンシューマがそれを起こすまで待機ステージに入ります。

    static class Producer extends Thread {
    private Queue<Integer> queue;
    private int maxSize;

    public Producer(Queue<Integer> queue, int maxSize, String name) {
        super(name);
        this.queue = queue;
        this.maxSize = maxSize;
    }

    @Override
    public void run() {
        while (true) {
            synchronized (queue) {
                if (queue.size() == maxSize) {
                    try {
                        System.out.println("Queue is full, " + "Producer thread waiting for " + "consumer to take something from queue");
                        queue.wait();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
                Random random = new Random();
                int i = random.nextInt();
                System.out.println(" ^^^ Producing value : " + i);
                queue.add(i);
                queue.notify();
            }
            sleepRandom();
        }
    }
}

CONSUMER THREAD: Consumer thread removes value from the buffer until the buffer is empty. If the buffer is empty, consumer calls wait() method and enter wait state until a producer sends a notify signal.

    static class Consumer extends Thread {
    private Queue<Integer> queue;
    private int maxSize;

    public Consumer(Queue<Integer> queue, int maxSize, String name) {
        super(name);
        this.queue = queue;
        this.maxSize = maxSize;
    }

    @Override
    public void run() {
        Random random = new Random();
        while (true) {
            synchronized (queue) {
                if (queue.isEmpty()) {
                    System.out.println("Queue is empty," + "Consumer thread is waiting" + " for producer thread to put something in queue");
                    try {
                        queue.wait();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
                System.out.println(" vvv Consuming value : " + queue.remove());
                queue.notify();
            }
            sleepRandom();
        }
    }
}

UTIL METHOD:

    public static void sleepRandom(){
    Random random = new Random();
    try {
        Thread.sleep(random.nextInt(250));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

Application Code:

    public static void main(String args[]) {
    System.out.println("How to use wait and notify method in Java");
    System.out.println("Solving Producer Consumper Problem");
    Queue<Integer> buffer = new LinkedList<>();
    int maxSize = 10;
    Thread producer = new Producer(buffer, maxSize, "PRODUCER");
    Thread consumer = new Consumer(buffer, maxSize, "CONSUMER");
    producer.start();
    consumer.start();
}

A Sample Output:

 ^^^ Producing value : 1268801606
 vvv Consuming value : 1268801606
Queue is empty,Consumer thread is waiting for producer thread to put something in queue
 ^^^ Producing value : -191710046
 vvv Consuming value : -191710046
 ^^^ Producing value : -1096119803
 vvv Consuming value : -1096119803
 ^^^ Producing value : -1502054254
 vvv Consuming value : -1502054254
Queue is empty,Consumer thread is waiting for producer thread to put something in queue
 ^^^ Producing value : 408960851
 vvv Consuming value : 408960851
 ^^^ Producing value : 2140469519
 vvv Consuming value : 65361724
 ^^^ Producing value : 1844915867
 ^^^ Producing value : 1551384069
 ^^^ Producing value : -2112162412
 vvv Consuming value : -887946831
 vvv Consuming value : 1427122528
 ^^^ Producing value : -181736500
 ^^^ Producing value : -1603239584
 ^^^ Producing value : 175404355
 vvv Consuming value : 1356483172
 ^^^ Producing value : -1505603127
 vvv Consuming value : 267333829
 ^^^ Producing value : 1986055041
Queue is full, Producer thread waiting for consumer to take something from queue
 vvv Consuming value : -1289385327
 ^^^ Producing value : 58340504
 vvv Consuming value : 1244183136
 ^^^ Producing value : 1582191907
Queue is full, Producer thread waiting for consumer to take something from queue
 vvv Consuming value : 1401174346
 ^^^ Producing value : 1617821198
 vvv Consuming value : -1827889861
 vvv Consuming value : 2098088641
于 2021-09-21T05:21:28.583 に答える
-1

スレッド化におけるwait()とnotifyall()の例。

同期された静的配列リストがリソースとして使用され、配列リストが空の場合はwait()メソッドが呼び出されます。配列リストに要素が追加されると、notify()メソッドが呼び出されます。

public class PrinterResource extends Thread{

//resource
public static List<String> arrayList = new ArrayList<String>();

public void addElement(String a){
    //System.out.println("Add element method "+this.getName());
    synchronized (arrayList) {
        arrayList.add(a);
        arrayList.notifyAll();
    }
}

public void removeElement(){
    //System.out.println("Remove element method  "+this.getName());
    synchronized (arrayList) {
        if(arrayList.size() == 0){
            try {
                arrayList.wait();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }else{
            arrayList.remove(0);
        }
    }
}

public void run(){
    System.out.println("Thread name -- "+this.getName());
    if(!this.getName().equalsIgnoreCase("p4")){
        this.removeElement();
    }
    this.addElement("threads");

}

public static void main(String[] args) {
    PrinterResource p1 = new PrinterResource();
    p1.setName("p1");
    p1.start();

    PrinterResource p2 = new PrinterResource();
    p2.setName("p2");
    p2.start();


    PrinterResource p3 = new PrinterResource();
    p3.setName("p3");
    p3.start();


    PrinterResource p4 = new PrinterResource();
    p4.setName("p4");
    p4.start();     

    try{
        p1.join();
        p2.join();
        p3.join();
        p4.join();
    }catch(InterruptedException e){
        e.printStackTrace();
    }
    System.out.println("Final size of arraylist  "+arrayList.size());
   }
}
于 2016-03-22T09:41:01.087 に答える