55

実行中の非同期タスクはほとんどなく、そのうちの少なくとも 1 つが終了するまで待つ必要があります (将来的には、N 個のタスクのうち util M 個が終了するのを待つ必要があるでしょう)。現在、それらはFutureとして提示されているため、次のようなものが必要です

/**
 * Blocks current thread until one of specified futures is done and returns it. 
 */
public static <T> Future<T> waitForAny(Collection<Future<T>> futures) 
        throws AllFuturesFailedException

このようなことはありますか?または同様のもの、Future には必要ありません。現在、先物のコレクションをループし、1 つが終了したかどうかを確認してから、しばらくスリープしてからもう一度確認します。これは最善の解決策ではないように見えます。長時間スリープすると不要な遅延が追加され、短時間スリープするとパフォーマンスに影響する可能性があるためです。

使ってみることができました

new CountDownLatch(1)

タスクが完了するとカウントダウンを減らして実行します

countdown.await()

、しかし、未来の創造を制御する場合にのみ可能であることがわかりました。可能ですが、システムの再設計が必要です。現在、タスク作成のロジック (Callable を ExecutorService に送信する) は、どの Future を待つかの決定から分離されているためです。オーバーライドすることもできました

<T> RunnableFuture<T> AbstractExecutorService.newTaskFor(Callable<T> callable)

RunnableFuture のカスタム実装を作成し、タスクが終了したときに通知されるようにリスナーをアタッチし、そのようなリスナーを必要なタスクにアタッチして CountDownLatch を使用しますが、これは、使用する ExecutorService ごとに newTaskFor をオーバーライドする必要があることを意味します。 AbstractExecutorService を拡張しません。同じ目的で指定された ExecutorService をラップすることもできますが、Future を生成するすべてのメソッドをデコレートする必要があります。

これらの解決策はすべてうまくいくかもしれませんが、非常に不自然に見えます。次のような単純なものが欠けているようです

WaitHandle.WaitAny(WaitHandle[] waitHandles)

C#で。この種の問題に対するよく知られた解決策はありますか?

アップデート:

もともと私は Future の作成にまったくアクセスできなかったので、エレガントな解決策はありませんでした。システムを再設計した後、Future の作成にアクセスし、実行プロセスに countDownLatch.countdown() を追加できたので、countDownLatch.await() を実行すると、すべて正常に動作します。他の回答に感謝します。私は ExecutorCompletionService について知りませんでした。実際に同様のタスクで役立つ可能性がありますが、この特定のケースでは、一部の Future がエグゼキューターなしで作成されるため、使用できませんでした。実際のタスクはネットワーク経由で別のサーバーに送信されます。リモートで完了し、完了通知を受け取ります。

4

8 に答える 8

55

簡単です、 ExecutorCompletionServiceをチェックしてください。

于 2008-09-22T23:42:28.200 に答える
9

ExecutorService.invokeAny

于 2008-09-23T03:45:41.597 に答える
7

結果キューを作成してキューで待機しないのはなぜですか? または、より単純に、ExecutorService + 結果キューであるため、CompletionService を使用します。

于 2008-09-23T01:42:18.070 に答える
6

これは実際には、wait() と notifyAll() を使用すると非常に簡単です。

まず、ロック オブジェクトを定義します。(これには任意のクラスを使用できますが、明示的にするのが好きです):

package com.javadude.sample;

public class Lock {}

次に、ワーカー スレッドを定義します。処理が終了したら、そのロック オブジェクトに通知する必要があります。通知は、ロック オブジェクトの同期ブロック ロック内にある必要があることに注意してください。

package com.javadude.sample;

public class Worker extends Thread {
    private Lock lock_;
    private long timeToSleep_;
    private String name_;
    public Worker(Lock lock, String name, long timeToSleep) {
        lock_ = lock;
        timeToSleep_ = timeToSleep;
        name_ = name;
    }
    @Override
    public void run() {
        // do real work -- using a sleep here to simulate work
        try {
            sleep(timeToSleep_);
        } catch (InterruptedException e) {
            interrupt();
        }
        System.out.println(name_ + " is done... notifying");
        // notify whoever is waiting, in this case, the client
        synchronized (lock_) {
            lock_.notify();
        }
    }
}

最後に、クライアントを次のように記述できます。

package com.javadude.sample;

public class Client {
    public static void main(String[] args) {
        Lock lock = new Lock();
        Worker worker1 = new Worker(lock, "worker1", 15000);
        Worker worker2 = new Worker(lock, "worker2", 10000);
        Worker worker3 = new Worker(lock, "worker3", 5000);
        Worker worker4 = new Worker(lock, "worker4", 20000);

        boolean started = false;
        int numNotifies = 0;
        while (true) {
            synchronized (lock) {
                try {
                    if (!started) {
                        // need to do the start here so we grab the lock, just
                        //   in case one of the threads is fast -- if we had done the
                        //   starts outside the synchronized block, a fast thread could
                        //   get to its notification *before* the client is waiting for it
                        worker1.start();
                        worker2.start();
                        worker3.start();
                        worker4.start();
                        started = true;
                    }
                    lock.wait();
                } catch (InterruptedException e) {
                    break;
                }
                numNotifies++;
                if (numNotifies == 4) {
                    break;
                }
                System.out.println("Notified!");
            }
        }
        System.out.println("Everyone has notified me... I'm done");
    }
}
于 2008-09-22T23:08:26.583 に答える
4

私の知る限り、Java にはWaitHandle.WaitAnyメソッドに類似した構造はありません。

これは、「WaitableFuture」デコレーターを介して実現できるように思えます。

public WaitableFuture<T>
    extends Future<T>
{
    private CountDownLatch countDownLatch;

    WaitableFuture(CountDownLatch countDownLatch)
    {
        super();

        this.countDownLatch = countDownLatch;
    }

    void doTask()
    {
        super.doTask();

        this.countDownLatch.countDown();
    }
}

これは、実行コードの前に挿入できる場合にのみ機能しますが、そうしないと、実行コードに新しいdoTask()メソッドが含まれないためです。しかし、実行前に何らかの方法で Future オブジェクトを制御できない場合、ポーリングせずにこれを行う方法は本当にありません。

または、未来が常に独自のスレッドで実行され、何らかの方法でそのスレッドを取得できる場合。次に、新しいスレッドを生成して他のスレッドに参加し、結合が戻った後に待機メカニズムを処理できます...これは本当に見苦しく、多くのオーバーヘッドを引き起こします。また、一部の Future オブジェクトが終了しない場合、デッド スレッドによっては多くのスレッドがブロックされる可能性があります。注意しないと、メモリとシステム リソースがリークする可能性があります。

/**
 * Extremely ugly way of implementing WaitHandle.WaitAny for Thread.Join().
 */
public static joinAny(Collection<Thread> threads, int numberToWaitFor)
{
    CountDownLatch countDownLatch = new CountDownLatch(numberToWaitFor);

    foreach(Thread thread in threads)
    {
        (new Thread(new JoinThreadHelper(thread, countDownLatch))).start();
    }

    countDownLatch.await();
}

class JoinThreadHelper
    implements Runnable
{
    Thread thread;
    CountDownLatch countDownLatch;

    JoinThreadHelper(Thread thread, CountDownLatch countDownLatch)
    {
        this.thread = thread;
        this.countDownLatch = countDownLatch;
    }

    void run()
    {
        this.thread.join();
        this.countDownLatch.countDown();
    }
}
于 2008-09-22T21:18:03.853 に答える
1

CompletableFuture代わりに sを使用できる場合CompletableFuture.anyOfは、結果に対して join を呼び出すだけで、必要なことを行うことができます。

CompletableFuture.anyOf(futures).join()

またはメソッドCompletableFutureを呼び出すことにより、エグゼキューターで s を使用できます。CompletableFuture.supplyAsyncrunAsync

于 2020-10-28T08:03:18.880 に答える
0

どちらが終了するかは気にしないので、すべてのスレッドに対して 1 つの WaitHandle を用意して、それを待機しないのはなぜですか? どちらが先に終了しても、ハンドルを設定できます。

于 2008-09-22T21:18:44.957 に答える
-1

このオプションを参照してください。

public class WaitForAnyRedux {

private static final int POOL_SIZE = 10;

public static <T> T waitForAny(Collection<T> collection) throws InterruptedException, ExecutionException {

    List<Callable<T>> callables = new ArrayList<Callable<T>>();
    for (final T t : collection) {
        Callable<T> callable = Executors.callable(new Thread() {

            @Override
            public void run() {
                synchronized (t) {
                    try {
                        t.wait();
                    } catch (InterruptedException e) {
                    }
                }
            }
        }, t);
        callables.add(callable);
    }

    BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(POOL_SIZE);
    ExecutorService executorService = new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 0, TimeUnit.SECONDS, queue);
    return executorService.invokeAny(callables);
}

static public void main(String[] args) throws InterruptedException, ExecutionException {

    final List<Integer> integers = new ArrayList<Integer>();
    for (int i = 0; i < POOL_SIZE; i++) {
        integers.add(i);
    }

    (new Thread() {
        public void run() {
            Integer notified = null;
            try {
                notified = waitForAny(integers);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println("notified=" + notified);
        }

    }).start();


    synchronized (integers) {
        integers.wait(3000);
    }


    Integer randomInt = integers.get((new Random()).nextInt(POOL_SIZE));
    System.out.println("Waking up " + randomInt);
    synchronized (randomInt) {
        randomInt.notify();
    }
  }
}
于 2009-01-08T10:02:17.807 に答える