いくつかのタスクオブジェクトを作成するシングルスレッドプロデューサーがあり、ArrayBlockingQueue
それらは(固定サイズの)に追加されます。
また、マルチスレッドのコンシューマーを開始します。これは、固定スレッドプール(Executors.newFixedThreadPool(threadCount);
)としてビルドされます。次に、いくつかのConsumerWorkerインスタンスをこのthreadPoolに送信します。各ConsumerWorkerは、上記のArrayBlockingQueueインスタンスを参照します。
そのような各ワーカーはtake()
キューで実行し、タスクを処理します。
私の問題は、これ以上行うべき作業がなくなるときに、ワーカーに知らせるための最良の方法は何であるかということです。つまり、プロデューサーがキューへの追加を終了したことをワーカーに通知するにはどうすればよいですか。この時点から、各ワーカーはキューが空であることを確認したら停止する必要があります。
私が今持っているのは、私のプロデューサーが(キューに何かを追加するという)仕事を終えたときにトリガーされるコールバックで初期化されるセットアップです。また、作成してThreadPoolに送信したすべてのConsumerWorkerのリストも保持しています。プロデューサーコールバックがプロデューサーが完了したことを私に告げるとき、私はこれを各労働者に伝えることができます。この時点で、キューが空でないかどうかを確認し続ける必要があります。キューが空になると停止するため、ExecutorServiceスレッドプールを正常にシャットダウンできます。こんな感じです
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(isRunning || !inputQueue.isEmpty()) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Object queueElement = inputQueue.take();
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
this.isRunning = isRunning;
}
}
ここでの問題は、プロデューサーが終了してシグナルを送信し、ConsumerWorkersがキュー内のすべてを消費する前に停止するという明らかな競合状態があることです。
私の質問は、これを同期してすべてが正常に機能するようにするための最良の方法は何ですか?プロデューサーが実行されているかどうかをチェックし、キューが空であるかどうかをチェックし、さらに1つのブロック(キューオブジェクト上)のキューから何かを取得する部分全体を同期する必要がありますか?isRunning
ConsumerWorkerインスタンスのブール値の更新を同期する必要がありますか?他に何か提案はありますか?
更新、これが私が使用することになった実用的な実装です:
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private final static Produced POISON = new Produced(-1);
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(true) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Produced queueElement = inputQueue.take();
Thread.sleep(new Random().nextInt(100));
if(queueElement==POISON) {
break;
}
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Consumer "+Thread.currentThread().getName()+" END");
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
try {
inputQueue.put(POISON);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
これは、いくつかの小さな変更を加えただけで、以下のJohnVintの回答に大きく影響を受けました。
===@vendhanのコメントによる更新。
どうぞよろしくお願いいたします。そうです、この質問の最初のコードスニペットには(他の問題の中でも)while(isRunning || !inputQueue.isEmpty())
実際には意味がないものがあります。
これの実際の最終的な実装では、「||」を置き換えるというあなたの提案に近いことをします。(または)「&&」(および)を使用すると、各ワーカー(消費者)は、リストから取得した要素が毒薬であるかどうかのみをチェックし、停止した場合は停止します(理論的には、ワーカーは実行中であり、キューが空であってはなりません)。