0

http://en.wikipedia.org/wiki/Producer-consumer_problemによると、セマフォを使用して P/C 問題をシミュレートしたいと考えています。デッドロックが発生しており、何が問題なのかわかりません。

public static void main(String[] args) {
        CustomBlockingQueue blockingQueue = new CustomBlockingQueue();
        new Thread(new Producer(blockingQueue)).start();
        new Thread(new Consumer(blockingQueue)).start();
    }
}

@SuppressWarnings("serial")
class CustomBlockingQueue extends LinkedList<Object> {
    private static final int MAX_SIZE = 10;

    private Semaphore mutex = new Semaphore(1);
    private Semaphore fillCount = new Semaphore(0);
    private Semaphore emptyCount = new Semaphore(MAX_SIZE);

    @Override
    public boolean offer(Object e) {
        try {
            mutex.acquire();
        } catch (InterruptedException e2) {
            e2.printStackTrace();
        }
        boolean result = super.offer(e);
        System.out.println("offer " + size());
        try {
            fillCount.release();
            emptyCount.acquire();
            mutex.release();
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        return result;
    }

    @Override
    public Object poll() {
        try {
            mutex.acquire();
        } catch (InterruptedException e2) {
            e2.printStackTrace();
        }
        Object result = super.poll();
        System.out.println("poll  " + size());
        try {
            emptyCount.release();
            fillCount.acquire();
            mutex.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return result;
    }
}

class Producer implements Runnable {
    private CustomBlockingQueue blockingQueue;
    private Random random = new Random();

    public Producer(CustomBlockingQueue blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                TimeUnit.SECONDS.sleep(random.nextInt(2));
                blockingQueue.offer(new Object());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Consumer implements Runnable {
    private CustomBlockingQueue blockingQueue;
    private Random random = new Random();

    public Consumer(CustomBlockingQueue blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                TimeUnit.SECONDS.sleep(random.nextInt(4));
                blockingQueue.poll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

セマフォの使用

セマフォは、ウェイクアップ コールの損失の問題を解決します。以下のソリューションでは、fillCount と emptyCount の 2 つのセマフォを使用して問題を解決しています。fillCount はバッファ内で読み取られるアイテムの数であり、 emptyCount はアイテムを書き込むことができるバッファ内の使用可能なスペースの数です。新しい項目がバッファに入れられると、fillCount が増分され、emptyCount が減分されます。値がゼロのときにプロデューサーが emptyCount をデクリメントしようとすると、プロデューサーはスリープ状態になります。次にアイテムが消費されると、emptyCount がインクリメントされ、プロデューサーが起動します。コンシューマーも同様に機能します。

4

2 に答える 2

2

BlockingQueue代わりに、ミューテックスのロックと待機のケースを取るを使用することを検討してください。

余談ですが、プロデューサー/コンシューマーの競合状態 (偽の割り込みとは対照的に) を示す古いページがあります。しかし、私の実装ではセマフォを使用していないため、役立つかどうかはわかりません。

http://256stuff.com/gray/docs/misc/producer_consumer_race_conditions/

于 2012-04-04T22:16:37.553 に答える
2

ロックの順序が間違っています:

提供する必要があります:

        emptyCount.acquire();
        mutex.acquire();
        doModification();
        mutex.release();
        fillCount.release();

ポーリングに必要な同様の変更:

        fillCount.acquire();
        mutex.acquire();
        doModification();
        mutex.release();
        emptyCount.release();

あなたの実装では、セマフォを解放するために他のスレッドがミューテックスを待っている可能性があるため、ミューテックスを保持している間にセマフォを待っています。

于 2012-04-04T22:18:01.427 に答える