3

以下のコード スニペットがあり、正常に動作します。しかし、問題は、2000 を超えるタスクをすぐに作成してエグゼキューター キューに入れることです。

エグゼキューター キューに既にあるタスクが完了しているかどうかを確認する必要があります。その後、追加のタスクを与えます。正確である必要はありません。つまり、キューに残っているタスクが 10 個未満の場合は、さらに 50 個追加します。

そのため、エグゼキュータ タスク キューには保留中のタスクがそれほど多くないため、shutdown() をタイムリーに動作させることもできます。それ以外の場合は、呼び出されたとしても、エグゼキュータはキュー内の 2000 個のタスクすべてを最初に完了しようとします。

これを達成するための最良の方法は何ですか?ありがとうございました

executor = Executors.newFixedThreadPool(numThreads);

while(some_condition==true)
{
    //if(executor < 10 tasks pending)  <---- how do i do this?
    //{                             
        for(int k=0;k<20;k++)
        {  
            Runnable worker = new MyRunnable();
            executor.execute(worker);
        }
    //}
    //else 
    //{
    //      wait(3000);
    //}
} 

セマフォを使用して更新します。

private final Semaphore semaphore = new Semaphore(10)
executor = new ThreadPoolExecutorWithSemaphoreFromJohnExample();

while(some_condition==true)
{

        Runnable worker = new MyRunnable();
        //So at this point if semaphore is full, then while loop would PAUSE(??) until
        //semaphore frees up again.
          executor.execute(worker);   
} 
4

4 に答える 4

8

以下のコード スニペットがあり、正常に動作します。しかし、問題は、2000 を超えるタスクをすぐに作成してエグゼキューター キューに入れることです。

これを行う 1 つの方法はThreadPoolExecutor、限定されたジョブ キューを使用して独自のジョブ キューを作成し、それにカスタムを設定するRejectedExecutionHandlerことです。これにより、キューに入れるジョブの数を細かく制御できます。

デフォルトでは、キューがいっぱいの場合ThreadPoolExecutor.submit(...)RejectedExecutionException. 以下のカスタム ハンドラーでは、キューによって拒否された場合、拒否ハンドラーはそれを元に戻し、キューに空きができるまでブロックします。したがって、ジョブが拒否/ドロップされることはありません。

独自のスレッドプールを開始し、独自の拒否ハンドラーを設定するおおよその方法は次のとおりです。

// you can tune the blocking queue size which is the number of jobs to queue
// when the NUM_THREADS are all working
final BlockingQueue<MyRunnable> queue =
    new ArrayBlockingQueue<MyRunnable>(NUM_JOBS_TO_QUEUE);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS,
       0L, TimeUnit.MILLISECONDS, queue);
// by default (unfortunately) the ThreadPoolExecutor will throw an exception
// when you submit the job that fills the queue, to have it block you do:
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
   public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      // this will block if the queue is full as opposed to throwing
      executor.getQueue().put(r);
   }
});
...
// now submit all of your jobs and it will block if the queue is full
for(int k = 0; k < 20000000; k++) {  
   Runnable worker = new MyRunnable();
   threadPool.execute(worker);
}

スレッドプールのブロックの詳細については、こちらの回答を参照してください。

処理する必要があるデータが多すぎる場合、ThreadPoolExecutor コマンドを待機させるにはどうすればよいですか?

ThreadPoolExecutor.CallerRunsPolicyジョブをスレッドプールに送信している呼び出し元にジョブを実行させる を使用することもできます。ただし、このソリューションは好きではありません。ジョブが終了するまで呼び出し元をブロックし、他のワーカー スレッドを枯渇させる可能性があるためです。また、サブミッターが複数ある場合でも、ジョブを実行するスレッドが多すぎる可能性があります。

ThreadPoolExecutor最後に、コアと最大スレッド数を同じ数に設定していることに注意してください。残念ながら、デフォルトでは、executor はコア スレッドを開始し、キューをいっぱいにしてから、追加のスレッドを最大数まで割り当てます。これは完全に直感に反します。

于 2012-07-19T20:32:39.397 に答える
7

シンプルなセマフォを使用できます。提出時に新しい許可を取得し、完了後に許可を解放して、他の人が提出を待っていることを許可します。

private final Semaphore semaphore = new Semaphore(10);//or however you want max queued at any given moment
ThreadPoolExecutor tp= new ThreadPoolExecutor(...){
      public void execute(Runnable r){
          semaphore.acquire();
          super.execute(r);
      }    
      public void afterExecute(Runnable r, Thread t){
         semaphore.release();  
         super.afterExecute(r,t);
      }
};

そのため、使用可能な許可がこれ以上ない場合、送信スレッドは一時停止されます。

于 2012-07-19T21:04:10.387 に答える
3

私は通常、タスク オブジェクトのオブジェクト「プール キュー」 (起動時に X タスクでいっぱいになる BlockingQueue) を使用して、このようなシステムを調整します。タスクをスレッドに送信する場合は、プール キューからタスクを取得し、データをロードしてから送信する必要があります。

タスクが完了して処理されると、再利用のためにプール キューに戻されます。

プールが空になると、スレッドの送信は、いくつかのタスクが返されるまでプール キューでブロックされます。

これは本質的に@John Vintが提案するセマフォ制御の形式ですが、さらにいくつかの利点があります-たとえば、ランナブルの継続的な作成/ GCはありません。PooolQueue.size をタイマーで GUI ステータス バーにダンプするのが好きなので、システムがどれだけ「ビジー」であるかを確認できます (また、オブジェクト リークをすばやく検出するため:)

于 2012-07-20T07:55:46.597 に答える
2

ThreadPool を圧倒したくないので、拒否ポリシーを設定する方が良いでしょう。複雑にせずにこれを達成する最善の方法は、次のようにすることです。

final ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.newFixedThreadPool(THREADS_COUNT);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

すべてのスレッドがビジー状態になると、呼び出し元のスレッドがタスクを実行します。このようなポリシーCallerRunsPolicy JavaDocへの参照は次のとおりです。

于 2013-07-29T15:28:47.537 に答える