5

常に少なくとも5つのスレッド、最大20のスレッド、およびタスクの無制限のキュー(タスクが拒否されないことを意味する)を持つExecutorを作成する方法はありますか?

ThreadPoolExecutor(5, 20, 60L, TimeUnit.SECONDS, queue) 私はキューのために考えたすべての可能性で新しいことを試みました:

new LinkedBlockingQueue() // never runs more than 5 threads
new LinkedBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting
new ArrayBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting
new SynchronousQueue() // no tasks can wait, after 20, they are rejected

そして、誰も望み通りに機能しませんでした。

4

3 に答える 3

5

たぶん、このようなものがあなたのために働くでしょうか?泡立てただけなので、突いてください。基本的に、基になるフィードに使用されるオーバーフロースレッドプールを実装しますThreadPoolExecutor

私がそれで見る2つの主要な欠点があります:

  • に返されるFutureオブジェクトがないsubmit()。しかし、それはあなたにとって問題ではないかもしれません。
  • ThreadPoolExecutorセカンダリキューは、ジョブが送信されたときにのみ空になります。エレガントな解決策が必要ですが、まだわかりません。着実にタスクが流れ込むことがわかっている場合はStusMagicExecutor、これは問題ではない可能性があります。(「5月」がキーワードです。)オプションは、提出されたタスクStusMagicExecutorが完了した後に突くようにすることである可能性がありますか?

StuのMagicExecutor:

public class StusMagicExecutor extends ThreadPoolExecutor {
    private BlockingQueue<Runnable> secondaryQueue = new LinkedBlockingQueue<Runnable>();  //capacity is Integer.MAX_VALUE.

    public StusMagicExecutor() {
        super(5, 20, 60L, SECONDS, new SynchronousQueue<Runnable>(true), new RejectionHandler());  
    }
    public void queueRejectedTask(Runnable task) {
        try {
            secondaryQueue.put(task);
        } catch (InterruptedException e) {
            // do something
        }
    }
    public Future submit(Runnable newTask) {
        //drain secondary queue as rejection handler populates it
        Collection<Runnable> tasks = new ArrayList<Runnable>();
        secondaryQueue.drainTo(tasks);

        tasks.add(newTask);

        for (Runnable task : tasks)
             super.submit(task);

        return null; //does not return a future!
    }
}

class RejectionHandler implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
        ((StusMagicExecutor)executor).queueRejectedTask(runnable);
    }
}
于 2009-09-17T08:05:51.240 に答える
1

この問題はクラスの欠点であり、コンストラクターパラメーターの組み合わせを考えると非常に誤解を招くと思います。これは、私がトップレベルのクラスにしたSwingWorkerの内部ThreadPoolExecutorから取得したソリューションです。最小値はありませんが、少なくとも上限を使用します。私が知らない唯一のことは、ロッキング実行からどのようなパフォーマンスヒットが得られるかということです。

public class BoundedThreadPoolExecutor extends ThreadPoolExecutor {
    private final ReentrantLock pauseLock = new ReentrantLock();
    private final Condition unpaused = pauseLock.newCondition();
    private boolean isPaused = false;
    private final ReentrantLock executeLock = new ReentrantLock();

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
                threadFactory);
    }

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
                handler);
    }

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
            RejectedExecutionHandler handler) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
                threadFactory, handler);
    }

    @Override
    public void execute(Runnable command) {
        executeLock.lock();
        try {
            pauseLock.lock();
            try {
                isPaused = true;
            } finally {
                pauseLock.unlock();
            }
            setCorePoolSize(getMaximumPoolSize());
            super.execute(command);
            setCorePoolSize(0);
            pauseLock.lock();
            try {
                isPaused = false;
                unpaused.signalAll();
            } finally {
                pauseLock.unlock();
            }
        } finally {
            executeLock.unlock();
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        pauseLock.lock();
        try {
            while (isPaused) {
                unpaused.await();
            }
        } catch (InterruptedException ignore) {

        } finally {
            pauseLock.unlock();
        }
    }
}
于 2010-03-31T16:01:36.507 に答える
1

のjavadocsは、スレッドが作成されると、キューがいっぱいになったときにのみ新しいスレッドが作成されることThreadPoolExecutorを明確に示しています。corePoolSizeしたがって、core5とmax20に設定すると、希望する動作が得られなくなります。

ただし、との両方を20に設定するcoremax、20のスレッドすべてがビジーの場合にのみ、タスクがキューに追加されます。もちろん、これにより、「最小5スレッド」の要件は少し意味がなくなります。これは、20個すべてが(とにかくアイドル状態になるまで)存続するためです。

于 2009-09-16T23:15:15.080 に答える