2

非同期で処理できる無制限のジョブキューがあります。各ジョブの処理により、このキューの新しいジョブの作成がトリガーされる場合とされない場合があります。

複数のワーカースレッドのプールでこのキューからアイテムを取得し、それらを並行して処理して、両方のキューが空になり、すべてのワーカースレッドがアイドル状態になり、キューで新しいジョブを待機するようにします(忙しいワーカーが新しいジョブを追加する可能性があるため)キューへのジョブ)。

java.util.concurrent労働者がプロデューサーでもあるこの特定の問題を解決するために使用できる実装を使用するためのレシピはありますか?このようなシナリオがAPIから直接サポートされているかどうかは明らかではありません。

特に、終了条件、つまり、使用可能なジョブがなくなったとき(ジョブキューが空)で、ジョブが生成されなくなったとき(すべてのアイドルワーカースレッド)を検出できるようにしたいです。

編集

以下のナムサンの答えは、基本的に、提出されたジョブの数と完了したジョブの数を追跡し、これらの数が等しい場合を終了条件として使用するという、最も洗練されたアプローチのようです。

java.util.concurrentこれを実現するために拡張された実装を使用して完全な例を実装しました。さらに、特定の方法で並べ替えられたインスタンスをThreadPoolExecutor受け入れるようにジョブキューを特殊化しました。Comparable

  • TestExecutor.java:拡張ThreadPoolExecutorするが、新しいジョブを作成する可能性のあるジョブを実行するための追加のメソッドと、送信されたすべてのジョブが完了するまで待機する新しいawaitメソッドを持つカスタムエグゼキューター。
  • WorkUnit.java:に送信する新しいジョブを作成する可能性のある同等の実行可能なジョブの例TestExecutor
  • Test.java:。WorkUnitを含むインスタンスを使用して例を実行するためのmainメソッドが含まれていますTestExecutor
4

5 に答える 5

1

生産者/消費者パターンでは完全に別個の関心事であるため、消費者が生産者でもあることは適切ではないと思います。

コンシューマーはすでにキューへの参照を持っています。プロデューサーのようにキューに追加するだけです。

AtomicIntegerまたはを使用して、現在アクティブなワーカーの数を記録するか、CountDownLatchすべてが静止するまで待機する場合はaを使用できます。

于 2012-10-22T06:40:58.860 に答える
1

私はこのタイプの問題に対するいくつかの異なる解決策を見てきました。

1つはpoll、コードの場合と同様に、メインスレッドでブロッキング呼び出しとして使用しますが、ワーカーから「ダミー」オブジェクトをキューに入れて、メインスレッドが永久に待機する可能性がある場合に、メインスレッドをウェイクアップします。たとえば、キュ​​ーにアイテムを追加せずに終了したワーカーは、ダミージョブを送信する必要があります。ダミージョブは、メインスレッドが認識して無視します(メインスレッドをウェイクアップするためだけに機能します)。アクティブなジョブの数を追跡することで、作成するダミーオブジェクトの数を減らすことができます。したがって、複雑さを犠牲にして「スプリアスウェイクアップ」を減らすことができます。ダミーオブジェクトを追加する必要があるのは、最後のジョブだけです。

別のメソッドは、別のオブジェクトを待機することです。たとえば、どんな古いものでもかまいObjectません。wait()このオブジェクトにメインスレッドを設定します。Object.notify()次に、ジョブは、完了するたびにこのスレッドをウェイクアップします。繰り返しになりますが、カウントすると、必要な通知の数を減らすことができます。

最も洗練された解決策は、を使用することかもしれませんSemaphore。基本的に、セマフォの値は、「飛行中のジョブ+キューアイテム」の数の負の値になります。この値は、ジョブがキューからアイテムをピックアップしても変更されません(フライト中のジョブは1つ増加し、キューアイテムは1つ減少するため)が、すべてのジョブは、追加するすべてのジョブに対してreducePermits()を呼び出す必要があります。終了する前に1回のrelease()呼び出し。

その後、メインスレッドacquire()は、作業中はブロックするだけです。ウェイクアップすると、すべてが完了します(飛行中+キューに入れられた作業がゼロであるため)。別のスレッドを開始して、実際にpoll呼び出しを実行し、ジョブを追加します(現在、メインスレッドによって実行されます)。このワーカーは、メインスレッドのが戻ったときにシャットダウンできacquireます。poll()ただし、終了するよりも、既存のワーカーを自分で作成する方が簡単な場合があります。そうすれば、この伝達関数はまったく必要ありません。

実際、このSemaphoreソリューションでは、キューを完全に省略して、エグゼキュータに組み込まれているキューを使用してみませんか?つまり、労働者は経由して新しい仕事を提出しexecutor.submit(newJob(nextJob))ますか?内部的には、エグゼキュータスレッドはとにかくブロッキングキューから作業を引き出しているため、明示的な外部キューを持つことにはいくつかの重複があります。

于 2012-10-24T07:27:26.550 に答える
1

数年前、私は似たようなことをしなければなりませんでしたが、スタックは制限されていました。私は可能な解決策を共有します:

idle_thread = MAX_THREAD;
do
{
    if(queue != empty) // If thread have work to do
    {
       idle_threads--;  // Count this threads was a worker   
       flag = true;
       while(queue != empty)  // Until queue have work
       {
          synchronized(this)
          {
            // task =  take_out_of_queue;
          }
        }
   }
   if(flag) // This flag must to be local to each thread, it is use to insure 
   {        // that threads will count this only one time for each time 
          // the queue got empty
         synchronized(this)
         {
            if(flag == false)
            idle_threads++;  // Count thread as a idle one
            flag = false;
         }
     }
     if(idle_threads == MAX_THREADS) out = true; // When all threads are idle stop the work loop
} while(!out)
于 2012-10-26T22:25:14.237 に答える
1

DirectoryScannerに関する私の投稿を参照してください。ほとんどの要件を満たしています。しかし、FuturesとCallableでは実装されていません。それを考えなければなりません。各タスクは重要視されていません。結果はなく、例外が生成されます。ファイルをスキャンするための並列で再帰的な方法です。

于 2012-10-27T04:18:11.427 に答える
1

以下のコードは、ラッパークラスを使用してExecutor、送信されたジョブの数をカウントし、それを完了したジョブの数と比較して、目的を達成する方法を示しています。タスクexecuteはラッパークラスのメソッドを呼び出す必要があり、基になるものExecutorを直接呼び出さないようにする必要があることに注意してください。必要に応じて、以下のラッパーを拡張して、ifの「submit」メソッドをラップするのは簡単ExecutorServiceです。

public class ExampleExecutor {

    private final Executor executor;
    private long submitCount = 0;
    private long doneCount = 0;

    public ExampleExecutor(Executor executor) {
        this.executor = executor;
    }

    public synchronized void execute(Collection<Runnable> commands) {
        for (Runnable command : commands) {
            execute(command);
        }
    }

    public synchronized void execute(final Runnable command) {
        submitCount ++;

        executor.execute(new Runnable() {
            public void run() {
                try {
                    command.run();
                } finally {
                    synchronized (ExampleExecutor.this) {
                        doneCount++;
                        if (doneCount == submitCount) {
                            ExampleExecutor.this.notifyAll();
                        }
                    }
                }
            }
        });
    }

    public synchronized void awaitCompletion() throws InterruptedException {
        while (doneCount != submitCount) {
            this.wait();
        }
    }
}

編集:上記のコードをどのように使用できるかを示すために、以下にテストケースを追加しました

public class Test {

    static class Task implements Runnable {
        private final String id;
        private final long repetitions;
        private final long respawnSize;
        private final ExampleExecutor executor;

        public Task(String id, long repetitions, long respawnSize, ExampleExecutor executor) {
            this.id = id;
            this.repetitions = repetitions;
            this.respawnSize = respawnSize;
            this.executor = executor;
        }

        public void run() {
            for (int i = 0; i < respawnSize; i ++) {
                // Spawning new sub tasks
                executor.execute(new Task(id + "-" + i, repetitions/2, 0, null));
            }

            double sum = 0;
            for (int i = 0; i < repetitions; i++) {
                sum += Math.sin(i);
            }

            System.err.println(id + " completed at " + System.currentTimeMillis() + " with sum=" + sum);
        }
    }

    public static void main(String argv[]) throws InterruptedException {
        ExampleExecutor executor = new ExampleExecutor(Executors.newFixedThreadPool(2));
        executor.execute(new Task("0", 2000000, 100, executor));

        System.err.println("main thread awaits completion");
        executor.awaitCompletion();
        System.err.println("main thread recieved completion event");
    }
}
于 2012-10-28T09:08:58.030 に答える