3

Javaで負荷テストアプリケーションを作成していて、テスト対象のサーバーに対してタスクを実行するスレッドプールがあります。したがって、1000個のジョブを作成し、それらを5つのスレッドで実行するには、次のようにします。

    ExecutorService pool = Executors.newFixedThreadPool(5);
    List<Runnable> jobs = makeJobs(1000);
    for(Runnable job : jobs){
        pool.execute(job);
    }

ただし、すべての「ジョブ」オブジェクトを事前に作成し、必要になるまでそれらをメモリに保持する必要があるため、このアプローチはあまりうまく拡張できないと思います。

新しいジョブが必要になるたびにプール内のスレッドをある種の「JobFactory」クラスに移動させ、必要な数のジョブが実行されるまでファクトリが要求に応じてRunnablesを構築する方法を探しています。ファクトリは「null」を返し始めて、これ以上行う作業がないことをスレッドに通知することができます。

このようなものを手作業でコーディングすることもできますが、それは十分に一般的なユースケースのようであり、代わりに使用できる素晴らしいが複雑な「java.util.concurrent」パッケージに何かあるかどうか疑問に思っていました。

4

2 に答える 2

5

AtomicInteger を使用してスレッド プールの実行中のスレッドですべての作業を実行し、実行されたランナブルの数を監視できます。

 int numberOfParties = 5;
 AtomicInteger numberOfJobsToExecute = new AtomicInteger(1000);
 ExecutorService pool = Executors.newFixedThreadPool(numberOfParties );
 for(int i =0; i < numberOfParties; i++){
     pool.submit(new Runnable(){
        public void run(){
            while(numberOfJobsToExecute.decrementAndGet() >= 0){
                makeJobs(1).get(0).run();
            }
        }
     });
 }

返された Future を List に保存し、get()(他のメカニズムの中でも) 完了を待つこともできます。

于 2012-03-06T20:10:07.687 に答える
4

うーん。容量を固定して を作成し、BlockingQueue<Runnable>各ワーカー スレッドをキューから取り出してRunnable実行することができます。次に、ジョブをキューに入れるプロデューサースレッドを作成できます。

メインスレッドは次のようになります。

// 100 is the capacity of the queue before blocking
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(100);
// start the submitter thread
new Thread(new JobSubmitterThread(queue)).start();
// make in a loop or something?
new Thread(new WorkerThread(queue)).start();
new Thread(new WorkerThread(queue)).start();
...

ワーカーは次のようになります。

public class WorkerThread implements Runnable {
     private final BlockingQueue<Runnable> queue;
     public WorkerThread(BlockingQueue<Runnable> queue) {
         this.queue = queue;
     }
     public void run() {
         // run until the main thread shuts it down using volatile boolean or ...
         while (!shutdown) {
             Runnable job = queue.take();
             job.run();
         }
     }
}

ジョブ送信者は次のようになります。

 public class JobSubmitterThread implements Runnable {
     private final BlockingQueue<Runnable> queue;
     public WorkerThread(BlockingQueue<Runnable> queue) {
         this.queue = queue;
     }
     public void run() {
         for (int jobC = 0; jobC < 1000; jobC++) {
             Runnable job = makeJob();
             // this would block when the queue reaches capacity
             queue.put(job);
         }
     }
 }
于 2012-03-06T20:05:03.193 に答える