http://en.wikipedia.org/wiki/Producer-consumer_problemによると、セマフォを使用して P/C 問題をシミュレートしたいと考えています。デッドロックが発生しており、何が問題なのかわかりません。
public static void main(String[] args) {
CustomBlockingQueue blockingQueue = new CustomBlockingQueue();
new Thread(new Producer(blockingQueue)).start();
new Thread(new Consumer(blockingQueue)).start();
}
}
@SuppressWarnings("serial")
class CustomBlockingQueue extends LinkedList<Object> {
private static final int MAX_SIZE = 10;
private Semaphore mutex = new Semaphore(1);
private Semaphore fillCount = new Semaphore(0);
private Semaphore emptyCount = new Semaphore(MAX_SIZE);
@Override
public boolean offer(Object e) {
try {
mutex.acquire();
} catch (InterruptedException e2) {
e2.printStackTrace();
}
boolean result = super.offer(e);
System.out.println("offer " + size());
try {
fillCount.release();
emptyCount.acquire();
mutex.release();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
return result;
}
@Override
public Object poll() {
try {
mutex.acquire();
} catch (InterruptedException e2) {
e2.printStackTrace();
}
Object result = super.poll();
System.out.println("poll " + size());
try {
emptyCount.release();
fillCount.acquire();
mutex.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}
}
class Producer implements Runnable {
private CustomBlockingQueue blockingQueue;
private Random random = new Random();
public Producer(CustomBlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
TimeUnit.SECONDS.sleep(random.nextInt(2));
blockingQueue.offer(new Object());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
private CustomBlockingQueue blockingQueue;
private Random random = new Random();
public Consumer(CustomBlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
TimeUnit.SECONDS.sleep(random.nextInt(4));
blockingQueue.poll();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
セマフォの使用
セマフォは、ウェイクアップ コールの損失の問題を解決します。以下のソリューションでは、fillCount と emptyCount の 2 つのセマフォを使用して問題を解決しています。fillCount はバッファ内で読み取られるアイテムの数であり、 emptyCount はアイテムを書き込むことができるバッファ内の使用可能なスペースの数です。新しい項目がバッファに入れられると、fillCount が増分され、emptyCount が減分されます。値がゼロのときにプロデューサーが emptyCount をデクリメントしようとすると、プロデューサーはスリープ状態になります。次にアイテムが消費されると、emptyCount がインクリメントされ、プロデューサーが起動します。コンシューマーも同様に機能します。