5

異なるスレッドがキュー(プロデューサー)にデータを入力し、1つのコンシューマーがこのキューから要素を取得する状況があります。私の問題は、これらの要素の1つがキューから取得されるときに、一部が欠落していることです(信号が欠落していますか?)。プロデューサーコードは次のとおりです。

class Producer implements Runnable {

    private Consumer consumer;

    Producer(Consumer consumer) { this.consumer = consumer; }

    @Override
public void run() {
    consumer.send("message");
  }
}

そして、それらは次のもので作成および実行されます。

ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 20; i++) {
  executor.execute(new Producer(consumer));
}

消費者コードは次のとおりです。

class Consumer implements Runnable {

private Queue<String> queue = new ConcurrentLinkedQueue<String>();

void send(String message) {
    synchronized (queue) {
        queue.add(message);
        System.out.println("SIZE: " + queue.size());
        queue.notify();
    }
}

@Override
public void run() {
    int counter = 0;
    synchronized (queue) {
    while(true) {
        try {
            System.out.println("SLEEP");
                queue.wait(10);
        } catch (InterruptedException e) {
                Thread.interrupted();
        }
        System.out.println(counter);
        if (!queue.isEmpty()) {             
            queue.poll();
            counter++;
        }
    }
    }
}

}

コードを実行すると、20個の要素が追加され、20個が取得されることがありますが、それ以外の場合、取得される要素は20未満です。これを修正する方法はありますか?

4

3 に答える 3

10

キューの代わりにBlockingQueueを使用することをお勧めします。LinkedBlockingDequeはあなたにとって良い候補かもしれません。

コードは次のようになります。

void send(String message) {
    synchronized (queue) {
        queue.put(message);
        System.out.println("SIZE: " + queue.size());
    }
}

そして、あなたはただする必要があるでしょう

queue.take()

コンシューマースレッドで

.take()は、アイテムがキューで使用可能になるまでブロックし、次に正確に1つを返すという考え方です(これは、実装が苦しんでいると思う場所です:ポーリング中に通知がありません)。.put()は、すべての通知を行う責任があります。待機/通知は必要ありません。

于 2012-04-04T07:37:32.913 に答える
2

コードの問題は、おそらくnotifyの代わりにを使用していることが原因ですnotifyAll。前者は、ロックを待機しているスレッドがある場合、単一のスレッドのみをウェイクアップします。これにより、待機中のスレッドがなく、信号が失われる競合状態が発生します。notifyAllは、すべてのスレッドがロックを取得できるかどうかを確認するためにウェイクアップすることを要求することにより、わずかなパフォーマンスコストで正確さを強制します。

これは、Effective Java 1st ed(p.150を参照)で最もよく説明されています。プログラマーはより強力な正確性の保証を提供するjava.util.concurrentを使用することが期待されているため、第2版ではこのヒントが削除されました。

于 2012-04-04T08:27:21.660 に答える
2

ConcurrentLinkedQueueと同期の両方を同時に使用するのは悪い考えのようです。そもそも、同時データ構造の目的に反します。

ConcurrentLinkedQueueデータ構造に問題はなく、BlockingQueueに置き換えると問題は解決しますが、これが根本的な原因ではありません。

問題はqueue.wait(10)にあります。これは時限待機方式です。10ms経過すると再度ロックを取得します。

  1. 通知(queue.notify())は、10ミリ秒が経過すると待機しているコンシューマースレッドがないため、失われます。

  2. ロックがコンシューマーによって再度要求されるため、プロデューサーはロックを取得できないため、キューに追加できません。

BlockingQueueに移動すると、wait(10)コードが削除され、BlockingQueueデータ構造によって待機と通知が処理されたため、問題が解決しました。

于 2014-09-03T17:16:34.807 に答える