5

計算する要素のキューを共有する N 個のワーカーがあります。各反復で、各ワーカーはキューから要素を削除し、計算する要素をさらに生成して同じキューに入れることができます。基本的に、各生産者は消費者でもあります。キューに要素がなく、すべてのワーカーが現在の要素の計算を終了すると、計算は終了します (したがって、計算する要素をこれ以上生成することはできません)。ディスパッチャー/コーディネーターを避けたいので、ワーカーは調整する必要があります。ワーカーが停止条件が有効かどうかを確認し、他のワーカーに代わって計算を停止できるようにするための最適なパターンは何ですか?

たとえば、すべてのスレッドがこのサイクルを実行すると、要素がすべて計算されると、すべてのスレッドが永久にブロックされることになります。

while (true) {
    element = queue.poll();
    newElements[] = compute(element);
    if (newElements.length > 0) {
        queue.addAll(newElements);
    }
}
4

1 に答える 1

6

アクティブなスレッドの数を維持します。

public class ThreadCounter {
    public static final AtomicInteger threadCounter = new AtomicInteger(N);
    public static final AtomicInteger queueCounter = new AtomicInteger(0);
    public static final Object poisonPill = new Object();
    public static volatile boolean cancel = false; // or use a final AomticBoolean instead
}

スレッドのポーリング ループは次のようになります (使用していると想定していますBlockingQueue) 。

while(!ThreadCounter.cancel) {
    int threadCount = ThreadCounter.threadCounter.decrementAndGet(); // decrement before blocking
    if(threadCount == 0 && ThreadCounter.queueCounter.get() == 0) {
        ThreadCounter.cancel = true;
        queue.offer(ThreadCounter.poisonPill);
    } else {
        Object obj = queue.take();
        ThreadCounter.threadCounter.incrementAndGet(); // increment when the thread is no longer blocking
        ThreadCounter.queueCounter.decrementAndGet();
        if(obj == ThreadCounter.poisonPill) {
            queue.offer(obj); // send the poison pill back through the queue so the other threads can read it
            continue;
        }
    }
}

スレッドがブロックしようとしている場合はBlockingQueue、カウンターをデクリメントします。すべてのスレッドがすでにキューで待機している場合 (つまりcounter == 0)、最後のスレッドcancelが true に設定され、キューを介してポイズン ピルを送信して他のスレッドを起動します。各スレッドはポイズン ピルを確認し、それをキューに送り返して残りのスレッドをウェイクアップし、それがcanceltrue に設定されていることを確認するとループを終了します。

編集:キュー内のオブジェクト数のカウントを維持するを追加して、データ競合を取り除きましたqueueCounter(明らかに、オブジェクトをキューに追加するqueueCounter.incrementAndGet()場所に呼び出しを追加する必要もあります)。これは次のように機能します: if threadCount == 0, butqueueCount != 0の場合、これは、スレッドがアイテムをキューから削除したばかりで、まだ を呼び出していないthreadCount.getAndIncrementため、cancel 変数がtrue に設定されていないことを意味します。threadCount.getAndIncrement呼び出しが呼び出しに先行することが重要queueCount.getAndDecrementです。そうしないと、データ競合が発生します。これを呼び出しqueueCount.getAndIncrementでインターリーブしないため、呼び出す順序は重要ではありませんthreadCount.getAndDecrement(後者はループの最後に呼び出され、前者はループの最初に呼び出されます)。

プロセスをいつ終了するかを決定するためにa を使用することはできないことに注意してください。これqueueCountは、キューにデータをまだ配置していないスレッドがまだアクティブである可能性があるためqueueCountです。スレッドは現在の反復を終了しました。

キューを介して繰り返し送信するpoisonPill代わりに、キャンセル スレッドにキューを介して (N-1) を送信させることができpoisonPillsます。別のキューを使用してこのアプローチを使用する場合は注意してください。一部のキュー (Amazon の Simple Queue Service など) は、それらのtakeメソッドと同等の方法で複数のアイテムを返す可能性があるpoisonPillためです。シャットダウンします。

さらに、ループを使用する代わりに、ループを使用して、ループが検出されたときにブレークするwhile(!cancel)ことができます。while(true)poisonPill

于 2013-05-16T16:36:09.707 に答える