アクティブなスレッドの数を維持します。
public class ThreadCounter {
public static final AtomicInteger threadCounter = new AtomicInteger(N);
public static final AtomicInteger queueCounter = new AtomicInteger(0);
public static final Object poisonPill = new Object();
public static volatile boolean cancel = false; // or use a final AomticBoolean instead
}
スレッドのポーリング ループは次のようになります (使用していると想定していますBlockingQueue) 。
while(!ThreadCounter.cancel) {
int threadCount = ThreadCounter.threadCounter.decrementAndGet(); // decrement before blocking
if(threadCount == 0 && ThreadCounter.queueCounter.get() == 0) {
ThreadCounter.cancel = true;
queue.offer(ThreadCounter.poisonPill);
} else {
Object obj = queue.take();
ThreadCounter.threadCounter.incrementAndGet(); // increment when the thread is no longer blocking
ThreadCounter.queueCounter.decrementAndGet();
if(obj == ThreadCounter.poisonPill) {
queue.offer(obj); // send the poison pill back through the queue so the other threads can read it
continue;
}
}
}
スレッドがブロックしようとしている場合はBlockingQueue、カウンターをデクリメントします。すべてのスレッドがすでにキューで待機している場合 (つまりcounter == 0)、最後のスレッドcancelが true に設定され、キューを介してポイズン ピルを送信して他のスレッドを起動します。各スレッドはポイズン ピルを確認し、それをキューに送り返して残りのスレッドをウェイクアップし、それがcanceltrue に設定されていることを確認するとループを終了します。
編集:キュー内のオブジェクト数のカウントを維持するを追加して、データ競合を取り除きましたqueueCounter(明らかに、オブジェクトをキューに追加するqueueCounter.incrementAndGet()場所に呼び出しを追加する必要もあります)。これは次のように機能します: if threadCount == 0, butqueueCount != 0の場合、これは、スレッドがアイテムをキューから削除したばかりで、まだ を呼び出していないthreadCount.getAndIncrementため、cancel 変数がtrue に設定されていないことを意味します。threadCount.getAndIncrement呼び出しが呼び出しに先行することが重要queueCount.getAndDecrementです。そうしないと、データ競合が発生します。これを呼び出しqueueCount.getAndIncrementでインターリーブしないため、呼び出す順序は重要ではありませんthreadCount.getAndDecrement(後者はループの最後に呼び出され、前者はループの最初に呼び出されます)。
プロセスをいつ終了するかを決定するためにa を使用することはできないことに注意してください。これqueueCountは、キューにデータをまだ配置していないスレッドがまだアクティブである可能性があるためqueueCountです。スレッドは現在の反復を終了しました。
キューを介して繰り返し送信するpoisonPill代わりに、キャンセル スレッドにキューを介して (N-1) を送信させることができpoisonPillsます。別のキューを使用してこのアプローチを使用する場合は注意してください。一部のキュー (Amazon の Simple Queue Service など) は、それらのtakeメソッドと同等の方法で複数のアイテムを返す可能性があるpoisonPillためです。シャットダウンします。
さらに、ループを使用する代わりに、ループを使用して、ループが検出されたときにブレークするwhile(!cancel)ことができます。while(true)poisonPill