28

ExecutorServiceプロデューサースレッドがN個の作業項目を生成し、それらをに送信してから、 N個の項目がすべて処理されるまで待機する必要があるという問題が2回発生しました。

警告

  • Nは事前にはわかりません。もしそうなら、私は単にを作成し、すべての作業が完了するまでCountDownLatchプロデューサースレッドを作成します。await()
  • 私のプロデューサースレッドはブロックする必要がありますが(つまり、を呼び出すことによって)、プロデューサースレッドの待機を停止させるために、すべての作業が完了したことを通知する方法がないため、を使用することCompletionServiceは不適切です。take()

私の現在の好ましい解決策は、整数カウンターを使用し、作業項目が送信されるたびにこれをインクリメントし、作業項目が処理されるときにそれをデクリメントすることです。N個のタスクすべてを送信した後、プロデューサースレッドはロックを待機し、counter == 0通知されるたびにチェックする必要があります。コンシューマースレッドは、カウンターがデクリメントされ、新しい値が0の場合、プロデューサーに通知する必要があります。

この問題へのより良いアプローチはありjava.util.concurrentますか、それとも「自分でローリング」するのではなく、使用すべき適切な構成がありますか?

前もって感謝します。

4

6 に答える 6

31

java.util.concurrent.Phaserそれはあなたにとってうまくいくようです。Java 7 でのリリースが計画されていますが、最も安定したバージョンはjsr166のインタレスト グループ Web サイトにあります。

フェイザーは栄光のサイクリック バリアです。N個のパーティーを登録し、準備ができたら特定のフェーズで彼らの前進を待ちます。

それがどのように機能するかの簡単な例:

final Phaser phaser = new Phaser();

public Runnable getRunnable(){
    return new Runnable(){
        public void run(){
            ..do stuff...
            phaser.arriveAndDeregister();
        }
    };
}
public void doWork(){
    phaser.register();//register self
    for(int i=0 ; i < N; i++){
        phaser.register(); // register this task prior to execution 
        executor.submit( getRunnable());
    }
    phaser.arriveAndAwaitAdvance();
}
于 2009-10-28T12:42:28.880 に答える
2

もちろん、タスクが次のようにラップCountDownLatchされるように、によって保護されたを使用できます。AtomicReference

public class MyTask extends Runnable {
    private final Runnable r;
    public MyTask(Runnable r, AtomicReference<CountDownLatch> l) { this.r = r; }

    public void run() {
        r.run();
        while (l.get() == null) Thread.sleep(1000L); //handle Interrupted
        l.get().countDown();
    }
}

タスクが作業を実行し、カウントダウンが設定されるまで (つまり、タスクの総数がわかるまで)スピンすることに注意してください。カウントダウンが設定されるとすぐに、カウントダウンして終了します。これらは次のように送信されます。

AtomicReference<CountDownLatch> l = new AtomicReference<CountDownLatch>();
executor.submit(new MyTask(r, l));

作品を作成/提出した後、作成したタスクの数がわかっている場合:

latch.set(new CountDownLatch(nTasks));
latch.get().await();
于 2009-10-28T10:08:07.240 に答える
1

私は ExecutorCompletionService を次のようなものに使用しました:

ExecutorCompletionService executor = ...;
int count = 0;
while (...) {
    executor.submit(new Processor());
    count++;
}

//Now, pull the futures out of the queue:
for (int i = 0; i < count; i++) {
    executor.take().get();
}

これには、送信されたタスクのキューを保持することが含まれるため、リストが任意に長い場合は、この方法が適している可能性があります。

AtomicIntegerただし、1 つのスレッドでインクリメントし、ワーカー スレッドでデクリメントできるように、調整には必ず を使用してください。

于 2009-10-28T10:10:39.707 に答える
0

プロデューサーは、キューが空であることを知る必要はありませんが、最後のタスクがいつ完了したかを知る必要があると思います。

waitforWorkDone(producer)コンシューマーにメソッドを追加します。プロデューサは N 個のタスクを追加し、wait メソッドを呼び出すことができます。ワーク キューが空ではなく、現在実行中のタスクがない場合、wait メソッドは着信スレッドをブロックします。

notifyAll()タスクが終了し、キューが空で、他のタスクが実行されていない場合、waitfor ロックのコンシューマー スレッド。

于 2009-10-28T10:08:56.400 に答える