完全な単純なシナリオ、つまり、特にキューでこれをどのように使用するかを提案するチュートリアルを入手できますか?
7 に答える
およびメソッドはwait()
、notify()
特定の条件が満たされるまでスレッドをブロックできるようにするメカニズムを提供するように設計されています。このために、要素の固定サイズのバッキングストアがあるブロッキングキューの実装を記述したいとします。
最初に行う必要があるのは、メソッドが待機する条件を特定することです。この場合、put()
ストアに空き領域ができるまでtake()
メソッドをブロックし、返す要素ができるまでメソッドをブロックする必要があります。
public class BlockingQueue<T> {
private Queue<T> queue = new LinkedList<T>();
private int capacity;
public BlockingQueue(int capacity) {
this.capacity = capacity;
}
public synchronized void put(T element) throws InterruptedException {
while(queue.size() == capacity) {
wait();
}
queue.add(element);
notify(); // notifyAll() for multiple producer/consumer threads
}
public synchronized T take() throws InterruptedException {
while(queue.isEmpty()) {
wait();
}
T item = queue.remove();
notify(); // notifyAll() for multiple producer/consumer threads
return item;
}
}
待機および通知メカニズムを使用する方法について注意すべき点がいくつかあります。
wait()
まず、コードの同期領域への呼び出しまたはnotify()
同期領域内にあることを確認する必要があります(wait()
とのnotify()
呼び出しは同じオブジェクトで同期されます)。この理由(標準のスレッドセーフの懸念以外)は、信号の欠落と呼ばれるものが原因です。
この例として、キューがいっぱいになったときにスレッドが呼び出すput()
場合があります。次に、条件をチェックし、キューがいっぱいであることを確認しますが、別のスレッドをブロックする前にスケジュールされます。次に、この2番目のスレッドtake()
はキューの要素であり、キューがいっぱいではなくなったことを待機中のスレッドに通知します。ただし、最初のスレッドはすでに条件をチェックしているため、wait()
進行する可能性がある場合でも、スケジュールが変更された後に呼び出すだけです。
take()
共有オブジェクトで同期することにより、最初のスレッドが実際にブロックされるまで2番目のスレッドの呼び出しを進めることができないため、この問題が発生しないようにすることができます。
次に、スプリアスウェイクアップと呼ばれる問題があるため、チェックしている条件をifステートメントではなくwhileループに入れる必要があります。notify()
これは、待機中のスレッドが呼び出されずに再アクティブ化される場合がある場所です。このチェックをwhileループに入れると、誤ったウェイクアップが発生した場合に条件が再チェックされ、スレッドがwait()
再度呼び出されます。
他の回答のいくつかが述べているように、Java 1.5はjava.util.concurrent
、待機/通知メカニズムに対してより高いレベルの抽象化を提供するように設計された新しい同時実行ライブラリ(パッケージ内)を導入しました。これらの新機能を使用して、元の例を次のように書き直すことができます。
public class BlockingQueue<T> {
private Queue<T> queue = new LinkedList<T>();
private int capacity;
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
public BlockingQueue(int capacity) {
this.capacity = capacity;
}
public void put(T element) throws InterruptedException {
lock.lock();
try {
while(queue.size() == capacity) {
notFull.await();
}
queue.add(element);
notEmpty.signal();
} finally {
lock.unlock();
}
}
public T take() throws InterruptedException {
lock.lock();
try {
while(queue.isEmpty()) {
notEmpty.await();
}
T item = queue.remove();
notFull.signal();
return item;
} finally {
lock.unlock();
}
}
}
もちろん、実際にブロッキングキューが必要な場合は、 BlockingQueueインターフェイスの実装を使用する必要があります。
また、このようなものについては、並行性に関連する問題と解決策について知りたいと思う可能性のあるすべてを網羅しているため、Javaの並行性の実践を強くお勧めします。
キューの例ではありませんが、非常に単純です:)
class MyHouse {
private boolean pizzaArrived = false;
public void eatPizza(){
synchronized(this){
while(!pizzaArrived){
wait();
}
}
System.out.println("yumyum..");
}
public void pizzaGuy(){
synchronized(this){
this.pizzaArrived = true;
notifyAll();
}
}
}
いくつかの重要なポイント:
1)絶対にしないでください
if(!pizzaArrived){
wait();
}
常にwhile(condition)を使用してください。
- a)スレッドは、誰からも通知されることなく、待機状態から散発的に目覚める可能性があります。(ピザ屋がチャイムを鳴らさなかった場合でも、誰かがピザを食べてみることにしました。)
- b)同期ロックを取得した後、状態を再度確認する必要があります。ピザが永遠に続くわけではないとしましょう。あなたは目を覚まし、ピザのラインナップを用意しましたが、それだけでは誰にとっても十分ではありません。チェックしないと紙を食べるかも!:)(おそらくより良い例は
while(!pizzaExists){ wait(); }
。
2)wait / nofityを呼び出す前に、ロックを保持(同期)する必要があります。スレッドは、ウェイクアップする前にロックを取得する必要もあります。
3)同期されたブロック内でロックを取得しないようにし、エイリアンメソッド(それらが何をしているのかわからないメソッド)を呼び出さないようにします。必要な場合は、デッドロックを回避するための対策を講じてください。
4)notify()に注意してください。何をしているのかがわかるまで、notifyAll()を使い続けます。
5)最後になりましたが、実際のJava同時実行性をお読みください。
wait()
あなたが具体的に求めたとしてもnotify()
、私はこの引用がまだ十分に重要であると感じています:
Josh Bloch、Effective Java 2nd Edition、Item 69:並行性ユーティリティを優先しwait
、notify
(彼を強調):
正しく使用することの難しさを考えると、
wait
代わりnotify
に高水準の並行性ユーティリティを使用する必要があります[...]を直接使用wait
するnotify
ことは、によって提供される高水準言語と比較して、「並行性アセンブリ言語」でのプログラミングに似ていますjava.util.concurrent
。新しいコードで使用する理由は、あったとしてもめったにありませんwait
notify
。
このJavaチュートリアルを見たことがありますか?
さらに、実際のソフトウェアでこの種のものをいじるのは避けてください。それが何であるかを知っているようにそれで遊ぶのは良いことですが、並行性には至る所に落とし穴があります。他の人のためにソフトウェアを構築している場合は、より高いレベルの抽象化と同期されたコレクションまたはJMSキューを使用することをお勧めします。
それは少なくとも私がしていることです。私は並行性の専門家ではないので、可能な限り手動でスレッドを処理することは避けています。
例
public class myThread extends Thread{
@override
public void run(){
while(true){
threadCondWait();// Circle waiting...
//bla bla bla bla
}
}
public synchronized void threadCondWait(){
while(myCondition){
wait();//Comminucate with notify()
}
}
}
public class myAnotherThread extends Thread{
@override
public void run(){
//Bla Bla bla
notify();//Trigger wait() Next Step
}
}
質問は、キュー(バッファ)を含むwait()+ notify()を要求します。最初に頭に浮かぶのは、バッファーを使用する生産者/消費者シナリオです。
システムの3つのコンポーネント:
- キュー[バッファ]-スレッド間で共有される固定サイズのキュー
- プロデューサー-スレッドが値を生成/バッファーに挿入します
- コンシューマー-スレッドはバッファーから値を消費/削除します
プロデューサースレッド:プロデューサーは、バッファーがいっぱいになるまで値をバッファーに挿入します。バッファがいっぱいになると、プロデューサはwait()を呼び出し、コンシューマがそれを起こすまで待機ステージに入ります。
static class Producer extends Thread {
private Queue<Integer> queue;
private int maxSize;
public Producer(Queue<Integer> queue, int maxSize, String name) {
super(name);
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run() {
while (true) {
synchronized (queue) {
if (queue.size() == maxSize) {
try {
System.out.println("Queue is full, " + "Producer thread waiting for " + "consumer to take something from queue");
queue.wait();
} catch (Exception ex) {
ex.printStackTrace();
}
}
Random random = new Random();
int i = random.nextInt();
System.out.println(" ^^^ Producing value : " + i);
queue.add(i);
queue.notify();
}
sleepRandom();
}
}
}
CONSUMER THREAD: Consumer thread removes value from the buffer until the buffer is empty. If the buffer is empty, consumer calls wait() method and enter wait state until a producer sends a notify signal.
static class Consumer extends Thread {
private Queue<Integer> queue;
private int maxSize;
public Consumer(Queue<Integer> queue, int maxSize, String name) {
super(name);
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run() {
Random random = new Random();
while (true) {
synchronized (queue) {
if (queue.isEmpty()) {
System.out.println("Queue is empty," + "Consumer thread is waiting" + " for producer thread to put something in queue");
try {
queue.wait();
} catch (Exception ex) {
ex.printStackTrace();
}
}
System.out.println(" vvv Consuming value : " + queue.remove());
queue.notify();
}
sleepRandom();
}
}
}
UTIL METHOD:
public static void sleepRandom(){
Random random = new Random();
try {
Thread.sleep(random.nextInt(250));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Application Code:
public static void main(String args[]) {
System.out.println("How to use wait and notify method in Java");
System.out.println("Solving Producer Consumper Problem");
Queue<Integer> buffer = new LinkedList<>();
int maxSize = 10;
Thread producer = new Producer(buffer, maxSize, "PRODUCER");
Thread consumer = new Consumer(buffer, maxSize, "CONSUMER");
producer.start();
consumer.start();
}
A Sample Output:
^^^ Producing value : 1268801606
vvv Consuming value : 1268801606
Queue is empty,Consumer thread is waiting for producer thread to put something in queue
^^^ Producing value : -191710046
vvv Consuming value : -191710046
^^^ Producing value : -1096119803
vvv Consuming value : -1096119803
^^^ Producing value : -1502054254
vvv Consuming value : -1502054254
Queue is empty,Consumer thread is waiting for producer thread to put something in queue
^^^ Producing value : 408960851
vvv Consuming value : 408960851
^^^ Producing value : 2140469519
vvv Consuming value : 65361724
^^^ Producing value : 1844915867
^^^ Producing value : 1551384069
^^^ Producing value : -2112162412
vvv Consuming value : -887946831
vvv Consuming value : 1427122528
^^^ Producing value : -181736500
^^^ Producing value : -1603239584
^^^ Producing value : 175404355
vvv Consuming value : 1356483172
^^^ Producing value : -1505603127
vvv Consuming value : 267333829
^^^ Producing value : 1986055041
Queue is full, Producer thread waiting for consumer to take something from queue
vvv Consuming value : -1289385327
^^^ Producing value : 58340504
vvv Consuming value : 1244183136
^^^ Producing value : 1582191907
Queue is full, Producer thread waiting for consumer to take something from queue
vvv Consuming value : 1401174346
^^^ Producing value : 1617821198
vvv Consuming value : -1827889861
vvv Consuming value : 2098088641
スレッド化におけるwait()とnotifyall()の例。
同期された静的配列リストがリソースとして使用され、配列リストが空の場合はwait()メソッドが呼び出されます。配列リストに要素が追加されると、notify()メソッドが呼び出されます。
public class PrinterResource extends Thread{
//resource
public static List<String> arrayList = new ArrayList<String>();
public void addElement(String a){
//System.out.println("Add element method "+this.getName());
synchronized (arrayList) {
arrayList.add(a);
arrayList.notifyAll();
}
}
public void removeElement(){
//System.out.println("Remove element method "+this.getName());
synchronized (arrayList) {
if(arrayList.size() == 0){
try {
arrayList.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else{
arrayList.remove(0);
}
}
}
public void run(){
System.out.println("Thread name -- "+this.getName());
if(!this.getName().equalsIgnoreCase("p4")){
this.removeElement();
}
this.addElement("threads");
}
public static void main(String[] args) {
PrinterResource p1 = new PrinterResource();
p1.setName("p1");
p1.start();
PrinterResource p2 = new PrinterResource();
p2.setName("p2");
p2.start();
PrinterResource p3 = new PrinterResource();
p3.setName("p3");
p3.start();
PrinterResource p4 = new PrinterResource();
p4.setName("p4");
p4.start();
try{
p1.join();
p2.join();
p3.join();
p4.join();
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.println("Final size of arraylist "+arrayList.size());
}
}