21

JavaDoc for ThreadPoolExecutorBlockingQueueは、エグゼキュータのバッキングにタスクを直接追加できるかどうかについては不明です。ドキュメントによると、呼び出しexecutor.getQueue()は「主にデバッグと監視を目的としています」。

私はThreadPoolExecutor自分でを構築していBlockingQueueます。キューへの参照を保持しているので、タスクを直接キューに追加できます。同じキューが返されるgetQueue()ので、の警告はgetQueue()、私の手段で取得したバッキングキューへの参照に適用されると思います。

コードの一般的なパターンは次のとおりです。

int n = ...; // number of threads
queue = new ArrayBlockingQueue<Runnable>(queueSize);
executor = new ThreadPoolExecutor(n, n, 1, TimeUnit.HOURS, queue);
executor.prestartAllCoreThreads();
// ...
while (...) {
    Runnable job = ...;
    queue.offer(job, 1, TimeUnit.HOURS);
}
while (jobsOutstanding.get() != 0) {
    try {
        Thread.sleep(...);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
executor.shutdownNow();

queue.offer()vsexecutor.execute()

私が理解しているように、一般的な使用法は、を介してタスクを追加することですexecutor.execute()。上記の私の例のアプローチには、キューをブロックするという利点がありexecute()ますが、キューがいっぱいになり、タスクが拒否されるとすぐに失敗します。また、ジョブの送信がブロッキングキューと相互作用することも気に入っています。これは私にとってより「純粋な」生産者/消費者のように感じます。

キューにタスクを直接追加することの意味:呼び出す必要がありますprestartAllCoreThreads()。そうしないと、ワーカースレッドが実行されません。エグゼキュータと他の相互作用がないと仮定すると、キューを監視するものはありません(ThreadPoolExecutorソースの調査によりこれが確認されます)。これは、直接エンキューのThreadPoolExecutor場合、0を超えるコアスレッド用に追加で構成する必要があり、コアスレッドがタイムアウトできるように構成してはならないことも意味します。

tl; dr

次のようにThreadPoolExecutor構成されている場合:

  • コアスレッド>0
  • コアスレッドはタイムアウトできません
  • コアスレッドは事前に開始されています
  • BlockingQueue遺言執行者の支援への参照を保持する

呼び出す代わりに、タスクをキューに直接追加することはできますexecutor.execute()か?

関連している

この質問(プロデューサー/コンシューマーワークキュー)も同様ですが、キューに直接追加することについては特に説明していません。

4

5 に答える 5

11

1つのトリックは、ArrayBlockingQueueのカスタムサブクラスを実装し、offer()メソッドをオーバーライドしてブロッキングバージョンを呼び出すことです。その後も、通常のコードパスを使用できます。

queue = new ArrayBlockingQueue<Runnable>(queueSize) {
  @Override public boolean offer(Runnable runnable) {
    try {
      return offer(runnable, 1, TimeUnit.HOURS);
    } catch(InterruptedException e) {
      // return interrupt status to caller
      Thread.currentThread().interrupt();
    }
    return false;
  }
};

(おそらく推測できるように、通常のコードパスはおそらく悪い考えなので、キューで直接offerを呼び出すと思います)。

于 2011-04-07T19:32:28.500 に答える
10

それが私なら、私はすでに他のすべてを使用しているという理由だけで、よりも使用Executor#execute()することを好みます。Queue#offer()java.util.concurrent

あなたの質問は良いものであり、それは私の興味をそそったので、私は以下のソースを調べましたThreadPoolExecutor#execute()

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

実行自体がワークキューを呼び出すことがわかりoffer()ますが、必要に応じて、おいしいプール操作を実行する前ではありません。execute()そのため、 ;を使用することをお勧めします。それを使用しないと(確かにはわかりませんが)、プールが最適でない方法で動作する可能性があります。offer()ただし、を使用してもエグゼキュータが破損するとは思いません。タスクは、以下を使用してキューからプルされているようです(これもThreadPoolExecutorから)。

Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll();
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
                r = workQueue.take();
            if (r != null)
                return r;
            if (workerCanExit()) {
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}

このgetTask()メソッドはループ内から呼び出されるだけなので、エグゼキュータがシャットダウンしない場合は、新しいタスクがキューに渡されるまでブロックされます(どこから来たかに関係なく)。

:ここにソースからのコードスニペットを投稿しましたが、決定的な答えをそれらに依存することはできません。APIにコーディングするだけで済みます。execute()の実装が時間の経過とともにどのように変化するかはわかりません。

于 2011-04-07T18:53:38.200 に答える
4

RejectedExecutionHandlerインスタンス化時にを指定することにより、キューがいっぱいになったときのプールの動作を実際に構成できます。ThreadPoolExecutor4つのポリシーを内部クラスとして定義します。これには、、、、および制御スレッドで新しいジョブを実行する私の個人的なお気に入りの、がAbortPolicy含まれます。DiscardOldestPolicyDiscardPolicyCallerRunsPolicy

例えば:

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
        nproc, // core size
        nproc, // max size
        60, // idle timeout
        TimeUnit.SECONDS,
        new ArrayBlockingQueue<Runnable>(4096, true), // Fairness = true guarantees FIFO
        new ThreadPoolExecutor.CallerRunsPolicy() ); // If we have to reject a task, run it in the calling thread.

質問で望ましい動作は、次のようなものを使用して取得できます。

public class BlockingPolicy implements RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        executor.getQueue.put(r); // Self contained, no queue reference needed.
    }

ある時点で、キューにアクセスする必要があります。これを行うのに最適な場所は、自己完結型RejectedExecutionHandlerであり、プールオブジェクトのスコープでのキューの直接操作から生じるコードの重複や潜在的なバグを保存します。それに含まれるハンドラーはThreadPoolExecutorを使用することに注意してくださいgetQueue()

于 2013-08-30T07:00:25.223 に答える
2

LinkedBlockingQueue使用しているキューが標準のメモリ内またはとは完全に異なる実装であるかどうかは、非常に重要な質問ですArrayBlockingQueue

たとえば、異なるマシンで複数のプロデューサーを使用して生産者/消費者パターンを実装し、別の永続化サブシステム(Redisなど)に基づくキューイングメカニズムを使用する場合、質問は、そうでない場合でも、それ自体で関連性があります。offer()OPのようなブロッキングが必要です。

したがって、ワーカースレッドを使用可能にして実行するには、指定された回答prestartAllCoreThreads()(または十分な回数)を呼び出す必要がありますが、これはストレスを感じるほど重要です。prestartCoreThread()

于 2013-10-25T11:48:45.107 に答える
0

必要に応じて、メイン処理と拒否されたタスクを分離する駐車場を使用することもできます-

    final CountDownLatch taskCounter = new CountDownLatch(TASKCOUNT);
    final List<Runnable> taskParking = new LinkedList<Runnable>();
    BlockingQueue<Runnable> taskPool = new ArrayBlockingQueue<Runnable>(1);
    RejectedExecutionHandler rejectionHandler = new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.err.println(Thread.currentThread().getName() + " -->rejection reported - adding to parking lot " + r);
            taskCounter.countDown();
            taskParking.add(r);
        }
    };
    ThreadPoolExecutor threadPoolExecutor =  new ThreadPoolExecutor(5, 10, 1000, TimeUnit.SECONDS, taskPool, rejectionHandler);
    for(int i=0 ; i<TASKCOUNT; i++){
        //main 
        threadPoolExecutor.submit(getRandomTask());
    }
    taskCounter.await(TASKCOUNT * 5 , TimeUnit.SECONDS);
    System.out.println("Checking the parking lot..." + taskParking);
    while(taskParking.size() > 0){
        Runnable r = taskParking.remove(0);
        System.out.println("Running from parking lot..." + r);
        if(taskParking.size() > LIMIT){
          waitForSometime(...);
        }
        threadPoolExecutor.submit(r);
    }
    threadPoolExecutor.shutdown();
于 2015-06-17T12:02:11.053 に答える