72

いくつかのタスクオブジェクトを作成するシングルスレッドプロデューサーがあり、ArrayBlockingQueueそれらは(固定サイズの)に追加されます。

また、マルチスレッドのコンシューマーを開始します。これは、固定スレッドプール(Executors.newFixedThreadPool(threadCount);)としてビルドされます。次に、いくつかのConsumerWorkerインスタンスをこのthreadPoolに送信します。各ConsumerWorkerは、上記のArrayBlockingQueueインスタンスを参照します。

そのような各ワーカーはtake()キューで実行し、タスクを処理します。

私の問題は、これ以上行うべき作業がなくなるときに、ワーカーに知らせるための最良の方法は何であるかということです。つまり、プロデューサーがキューへの追加を終了したことをワーカーに通知するにはどうすればよいですか。この時点から、各ワーカーはキューが空であることを確認したら停止する必要があります。

私が今持っているのは、私のプロデューサーが(キューに何かを追加するという)仕事を終えたときにトリガーされるコールバックで初期化されるセットアップです。また、作成してThreadPoolに送信したすべてのConsumerWorkerのリストも保持しています。プロデューサーコールバックがプロデューサーが完了したことを私に告げるとき、私はこれを各労働者に伝えることができます。この時点で、キューが空でないかどうかを確認し続ける必要があります。キューが空になると停止するため、ExecutorServiceスレッドプールを正常にシャットダウンできます。こんな感じです

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning || !inputQueue.isEmpty()) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
    this.isRunning = isRunning;
}

}

ここでの問題は、プロデューサーが終了してシグナルを送信し、ConsumerWorkersがキュー内のすべてを消費する前に停止するという明らかな競合状態があることです。

私の質問は、これを同期してすべてが正常に機能するようにするための最良の方法は何ですか?プロデューサーが実行されているかどうかをチェックし、キューが空であるかどうかをチェックし、さらに1つのブロック(キューオブジェクト上)のキューから何かを取得する部分全体を同期する必要がありますか?isRunningConsumerWorkerインスタンスのブール値の更新を同期する必要がありますか?他に何か提案はありますか?

更新、これが私が使用することになった実用的な実装です:

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;

private final static Produced POISON = new Produced(-1); 

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(true) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Produced queueElement = inputQueue.take();
            Thread.sleep(new Random().nextInt(100));
            if(queueElement==POISON) {
                break;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Consumer "+Thread.currentThread().getName()+" END");
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
    try {
        inputQueue.put(POISON);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

}

これは、いくつかの小さな変更を加えただけで、以下のJohnVintの回答に大きく影響を受けました。

===@vendhanのコメントによる更新。

どうぞよろしくお願いいたします。そうです、この質問の最初のコードスニペットには(他の問題の中でも)while(isRunning || !inputQueue.isEmpty())実際には意味がないものがあります。

これの実際の最終的な実装では、「||」を置き換えるというあなたの提案に近いことをします。(または)「&&」(および)を使用すると、各ワーカー(消費者)は、リストから取得した要素が毒薬であるかどうかのみをチェックし、停止した場合は停止します(理論的には、ワーカーは実行中であり、キューが空であってはなりません)。

4

6 に答える 6

87

take()キューから続行する必要があります。ポイズンピルを使用して、作業者に停止するように指示できます。例えば:

private final Object POISON_PILL = new Object();

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            if(queueElement == POISON_PILL) {
                 inputQueue.add(POISON_PILL);//notify other threads to stop
                 return;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void finish() {
    //you can also clear here if you wanted
    isRunning = false;
    inputQueue.add(POISON_PILL);
}
于 2012-01-23T16:10:46.230 に答える
14

シャットダウンする必要があることを通知するために、ワーカーに特別な作業パケットを送信します。

public class ConsumerWorker implements Runnable{

private static final Produced DONE = new Produced();

private BlockingQueue<Produced> inputQueue;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    for (;;) {
        try {
            Produced item = inputQueue.take();
            if (item == DONE) {
                inputQueue.add(item); // keep in the queue so all workers stop
                break;
            }
            // process `item`
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

}

ワーカーを停止するConsumerWorker.DONEには、キューに追加するだけです。

于 2012-01-23T16:15:21.800 に答える
1

キューから要素を取得しようとするコードブロックで、poll(time,unit)の代わりにを使用しますtake()

try { 
    Object queueElement = inputQueue.poll(timeout,unit);
     //process queueElement        
 } catch (InterruptedException e) {
        if(!isRunning && queue.isEmpty())
         return ; 
 } 

タイムアウトの適切な値を指定することにより、不幸なシーケンスが発生した場合にスレッドがブロックし続けないようにします。

  1. isRunning本当です
  2. キューが空になるため、スレッドはブロックされた待機に入ります(使用している場合)take()
  3. isRunningfalseに設定されています
于 2012-01-23T16:47:54.667 に答える
1

CountDownLatchサイズがプロデューサーのレコード数である、を使用してそれを行うことはできません。そして、すべての消費者はcountDown、レコードを処理した後になります。そして、awaits()すべてのタスクが終了すると、メソッドと交差します。次に、すべての消費者を停止します。すべてのレコードが処理されるため。

于 2017-11-15T06:40:14.090 に答える
0

使用できる戦略はいくつかありますが、簡単な方法の1つは、ジョブの終了を通知するタスクのサブクラスを作成することです。プロデューサーはこのシグナルを直接送信しません。代わりに、このタスクサブクラスのインスタンスをエンキューします。コンシューマーの1つがこのタスクを実行して実行すると、シグナルが送信されます。

于 2012-01-23T16:11:10.123 に答える
0

マルチスレッドのプロデューサーとマルチスレッドのコンシューマーを使用する必要がありました。最終的にはScheduler -- N Producers -- M Consumers、それぞれ2つがキューを介して通信するスキーム(合計2つのキュー)になりました。スケジューラーは、最初のキューをデータ生成の要求で満たし、次にN個の「ポイズンピル」で満たします。アクティブなプロデューサー(atomic int)のカウンターがあり、最後のポイズンピルを受け取った最後のプロデューサーがM個のポイズンピルをコンシューマーキューに送信します。

于 2017-02-13T09:32:18.917 に答える