12

キュー サーバーからデータを取得しており、それを処理して確認を送信する必要があります。このようなもの:

while (true) {
    queueserver.get.data
    ThreadPoolExecutor //send data to thread
    queueserver.acknowledgement 

スレッドで何が起こっているのか完全には理解していませんが、このプログラムはデータを取得してスレッドに送信し、すぐにそれを確認していると思います。したがって、各キューに 200 個の未確認アイテムしか持てないという制限がある場合でも、それを受信できる限り速くプルします。これは、単一のサーバーでプログラムを作成する場合には適していますが、複数のワーカーを使用している場合は、スレッド キュー内のアイテムの量が完了した作業を反映したものではなく、作業の速さを反映していないため、これが問題になります。キュー サーバーからアイテムを取得できます。

スレッド キューがいっぱいになった場合にプログラムを待機させるためにできることはありますか?

4

4 に答える 4

27

処理する必要があるデータが多すぎる場合、ThreadPoolExecutor コマンドを待機させるにはどうすればよいですか?

無制限のキューの代わりにBlockingQueue、制限付きのを使用できます。

BlockingQueue<Date> queue = new ArrayBlockingQueue<Date>(200);

に送信されるジョブに関しては、 を使用して作成されExecutorServiceたデフォルトの を使用する代わりに、無制限のキューを使用して、独自のジョブを作成できます。ExecutorServiceExecutors

return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
              new ArrayBlockingQueue<Runnable>(200));

キューがいっぱいになると、送信された新しいタスクが拒否されます。RejectedExecutionHandlerキューに送信するを設定する必要があります。何かのようなもの:

final BlockingQueue queue = new ArrayBlockingQueue<Runnable>(200);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads,
           0L, TimeUnit.MILLISECONDS, queue);
// by default (unfortunately) the ThreadPoolExecutor will throw an exception
// when you submit the 201st job, to have it block you do:
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
   public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      // this will block if the queue is full
      executor.getQueue().put(r);
      // check afterwards and throw if pool shutdown
      if (executor.isShutdown()) {
         throw new RejectedExecutionException(
              "Task " + r + " rejected from " + e);
      }
   }
});

Java にThreadPoolExecutor.CallerBlocksPolicy.

于 2012-04-27T15:15:23.007 に答える
3

ワーカーがタスクの作業を開始したときに確認応答が必要な場合はThreadFactory、実際の作業を行う前にスレッドから確認応答を送信するカスタムを作成できます。beforeExecuteまたは、をオーバーライドできますThreadPoolExecutor

ThreadPoolExecutor新しいワーカーが新しいタスクのために解放されたときに確認応答が必要な場合は、をaSynchronousQueueとaで初期化するThreadPoolExecutor.CallerRunsPolicyか、呼び出し元がブロックする独自のポリシーで初期化できると思います。

于 2012-04-27T15:21:43.960 に答える
0

まず、擬似コードで行ったことが待機に忙しいため、態度が間違っていると思います。javatoturialhttp : //docs.oracle.com/javase/tutorial/essential/concurrency/の同時実行チュートリアルを読む必要があります。

それを無視して、illはビジーウェイト(推奨されていません)で解決策を提供します:

     ExecutorService e1 =  Executors.newFixedThreadPool(20);
     while (true) {
          if (!serverq.isEmpty() && !myq.isFull()) myq.enq(serverq.poll());
          if (!myq.isEmpty()) e1.execute(myq.poll());
     }

ノート:

1.他の回答で述べられているように、myqが同期されていることを確認します。いくつかのブロッキングキューを拡張して、同期が正しいことを確認できます。

2.サービスの反復でサーバーから実行することを実行するランナブルクラスを実装します。これらのランナブルは、コンストラクターへのパラメーターとしてmyqを取得し、グローバル変数として保存する必要があります。

3.myqはランナブルを取得します。そのrunメソッドの最後で、ランナブルがmyqから自分自身を削除することを確認する必要があります。

于 2012-04-27T16:08:46.350 に答える
0

200 を超えるタスクを実行せず、タスクが完了するのを待ってから 201 のタスクを送信するblockingPool はどうですか。アプリケーションでセマフォを使用してそれを達成しました。コンストラクターに値を渡すことで制限を変更することもできます。

ここでの@Grayの回答との唯一の違いは、この場合、タスクが拒否されることはめったにないことです。セマフォは、他のタスクが終了しない限り、任意の 201 タスクを待機させます。それにもかかわらず、拒否された場合にそのタスクをエグゼキューターに再送信するための拒否ハンドラーがあります。

private class BlockingPool extends ThreadPoolExecutor {
    private final Semaphore semaphore;      
    public BlockingPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, int tasksAllowedInThreads){    
        super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                executor.execute(r);
            }
        });
        semaphore = new Semaphore(tasksAllowedInThreads);
    }

    @Override
    public void execute(Runnable task){
        boolean acquired = false;
        do{
            try{
                semaphore.acquire();
                acquired = true;
            } catch (final InterruptedException e){
                // log
            }
        } while (!acquired); // run in loop to handle InterruptedException
        try{
            super.execute(task);
        } catch (final RejectedExecutionException e){
            System.out.println("Task Rejected");
            semaphore.release();
            throw e;
        }
    }    

    @Override
    protected void afterExecute(Runnable r, Throwable t){
        super.afterExecute(r, t);
        if (t != null){
            t.printStackTrace();
        }
        semaphore.release();
    }
}

これは理にかなっていますか?

于 2015-07-18T07:23:26.197 に答える