0

私は典型的な生産者/消費者プログラムを書きました。1人のプロデューサーがキューを埋める責任があります。キューのデキューを担当する1人のコンシューマー。以下は私の実装です。

トリッキーな部分は、別の共有変数vFlagがあることです。process(str)は、そのtrueに変更する可能性があります。これは、プロデューサースレッドによって検出され、キューを空にします。

private class PQueue
{
    private Queue<String> queue;
    PQueue()
    {
        queue = new LinkedList<String>();
    }

    public synchronized void add(String str)
    {
        if(queue.size() > 1)
        {
        wait();
        }
        queue.add(token);
        notify();
    }

    public synchronized String poll()
    {
        if(queue.size() == 0)
        {
            wait();
        }
        String str = queue.poll();
        notify();
        return str;
    }

    public synchronized void clear()
    {
        queue.clear();
    }

}

PQueue queue = new PQueue();
private class Producer implements Runnable
{
    public void run()
    {
        while (true) {
            String str = read();
            queue.add(str);

            if(vFlag.value == false)
            {
                queue.clear();
                vFlag.value = true;
            }

            if (str.equals("end"))
                break;
        }           
        exitFlag = true;
    }
}

private class Consumer implements Runnable
{   
    public void run()
    {
        while(exitFlag == false)
        {
            String str = queue.poll();
            process(str, vFlag);
        }
    }
}

現在、まだいくつかのバグがあります。並行性のバグを回避するためのより優れた並行プログラミングの哲学やメカニズムなど、一般的な手がかりや提案を誰かに教えてもらえますか?

上記の実装では、プログラムがプロセスのエラーを報告することがあります(str、vFlag)

Exception in thread "Thread-3" java.lang.NullPointerException

それ以外の場合、プログラムは正常に実行されます。queue.clear()をコメントアウトすると、うまく機能します。

4

2 に答える 2

2

ブール値が複数のスレッドによって変更されているため、vFlag.valueフィールドがであるかどうかを確認する必要があります。volatileまた、プロデューサーは、valueがに設定されていることを確認trueしたら、キューを排出してからに設定すると想定しますfalse。競合状態が発生する可能性があるようです。上書きを避けるためにAtomicBoolean使用できるように、代わりに使用することを検討してください。compareAndSet(...)

これが問題であるかどうかはわかりませんが、次のような操作を行って、設定済み/未設定の競合状態の一部を削除することを検討する必要があります。これは必要ありませんAtomicBoolean

while (!vFlag.value) {
    // set to true immediately and then clear to avoid race
    vFlag.value = true;
    queue.clear();
}

BlockingQueue独自のを同期する代わりに、を使用することを検討することもできますQueueBlockingQueue■スレッドセーフを処理し、ロジックを実行するtake()メソッドもあります。wait()キューにアイテムがある場合、コードはブロックしたいようです。そのためnew LinkedBlockingQueue(1)、キューのサイズを1に制限してブロックするものを使用することをお勧めしますput(...)

最後に、これらの場合は、whileループの代わりにifループを使用することを常にお勧めします。

// always use while instead of if statements here
while (queue.size() == 0) {
    wait();
}

これにより、予期しないシグナルと複数のコンシューマーまたはプロデューサーの問題が解決されます。

于 2012-07-31T21:59:45.970 に答える
2

一般的なスレッド セーフ キューの場合はConcurrentLinkedQueueを使用できます。 または、一度に 1 つのオブジェクトを一度に 1 つのオブジェクトに制限したい場合はSynchronousQueueを使用 できますが、次のオブジェクトを事前に計算するにはArrayBlockingQueueを使用できます。

Java メモリー・モデルでは、値の効果的なキャッシュと、実行中のスレッドが 1 つだけの場合は並べ替えによって結果が変わらないと仮定したコンパイラーによる命令の並べ替えが可能になります。そのため、キャッシュされた値の使用が許可されているため、プロデューサーが vFlag を true として表示することは保証されません。したがって、コンシューマ スレッドは vflag を true と見なし、プロデューサーは vflag を false と見なします。変数を volatile として宣言するということは、JVM が毎回値を検索する必要があることを意味し、アクセスの両側でメモリ フェンスが生成され、並べ替えが妨げられます。例えば

public class vFlagClass {
  public volatile boolean value;
}

コンシューマーは、vFlag を true に設定した後も引き続きキューにアクセスできます。コンシューマーはキュー自体をクリアするか、キューがクリアされたというプロデューサーからのシグナルを待つ必要があります。値が揮発性であると仮定しても、vFlag.value が true に設定された後、プロデューサをアクティブ化してキューをクリアする前に、プログラムがキュー内のすべての入力を消費することは許容される操作です。

さらに、キュー構造は、1 つのスレッドのみが書き込みを行い、1 つのスレッドが読み取りを行うことを前提としています。プロデューサーにキューをクリアさせることで、プロデューサーは効果的にキューから読み取り、不変条件を破ることができます。

あなたのエラーについて具体的には、キューが空のときにキューをクリアするようにコンシューマーがプロデューサーに指示し、キューを再度試行すると、キューがまだ空であるため待機していると思います。次に、プロデューサーが起動し、キューがいっぱいになり、これによりコンシューマーが起動し、プロデューサーがキューをクリアし、コンシューマーが開始してキューをポーリングしますが、キューは空です。したがって、NULL が返されます。言い換えると

Producer : queue.add(str); // queue size is now 1
Producer : goes to end of loop
Consumer : String str = queue.poll(); // queue size is now 0
Consumer : process(str, vFlag); // vFlag.value is set to false 
Consumer : String str = queue.poll(); // waits at
            if(queue.size() == 0)
            {
                wait();
            }
Producer : queue.add(str);
Producer : notify() // wakes up consumer
Producer : queue.clear();
Consumer : String str = queue.poll(); // note queue is empty, so returns NULL

この場合、 PQueue.poll を次のように変更する必要があります。

String str = null;
while((str = queue.poll()) == null)
    wait();
return str;

ただし、前述の他のすべてのバグがまだ残っているため、これは良い解決策ではありません。注: これにより、2 つのスレッドのみがキューから読み取ることができます。複数のスレッドがキューに書き込むことは防止されません。

編集:別の競合状態に気づきました。プロデューサーは「終了」をキューに入れ、コンシューマーはそれを読み取って処理し、プロデューサーはexitFlagを更新します。exitFlag を変更しても、古いデータの読み取りとは関係がないため、何も修正されないことに注意してください。exitFlag を使用する代わりに、"end" を読み取ったときに Consumer も中断する必要があります。

Java でのメモリの一貫性について詳しくは、ドキュメントを参照してください。

于 2012-07-31T22:19:28.853 に答える