7

ExecutorService を使用して、さまざまなスレッドで多くのタスクを実行します。スレッド プールで待機している Runnable インスタンスが多すぎると、メモリ不足の問題が発生することがあります。

それを解決するために、ブロッキングジョブエグゼキュータを作成しようとしています。それを行うための公式の解決策はありますか?

例えば:

    BlockingJobExecutor executor = new BlockingJobExecutor(3);
    for (int i = 0; i < 1000; i++) {
        executor.addJob(new Runnable() {

            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                LogFactory.getLog(BTest.class).info("test " + System.currentTimeMillis());
            }
        });
    }
    executor.shutdown();

BlockingJobExecutor クラスは次のとおりです。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class BlockingJobExecutor {

    AtomicInteger counter = new AtomicInteger();
    ExecutorService service;
    int threads;

    public BlockingJobExecutor(int threads) {
        if (threads < 1) {
            throw new IllegalArgumentException("threads must be greater than 1.");
        }
        service = Executors.newFixedThreadPool(threads);
        this.threads = threads;
    }

    static class JobWrapper implements Runnable {
        BlockingJobExecutor executor;
        Runnable job;

        public JobWrapper(BlockingJobExecutor executor, Runnable job) throws InterruptedException {
            synchronized (executor.counter) {
                while (executor.counter.get() >= executor.limit()) {
                    executor.counter.wait();
                }
            }
            this.executor = executor;
            this.job = job;
        }

        @Override
        public void run() {
            try {
                job.run();
            } finally {
                synchronized (executor.counter) {
                    executor.counter.decrementAndGet();
                    executor.counter.notifyAll();
                }
            }
        }
    }

    public int limit() {
        return threads;
    }

    public void shutdown() {
        service.shutdown();
        try {
            service.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public void addJob(Runnable job) {
        try {
            service.execute(new JobWrapper(this, job));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

}
4

1 に答える 1

16

これには 2 つの方法があります。実行待ちのランナブルが多すぎたり、同時に実行中のスレッドが多すぎたりする可能性があります。キューに入れられたジョブが多すぎる場合は、 で固定サイズを使用して、BlockingQueueキューExecutorServiceに入れられるアイテムの数を制限できます。次に、新しいタスクをキューに入れようとすると、キューに空きができるまで操作がブロックされます。

一度に実行するスレッドが多すぎる場合は、ExecutorService でタスクを実行するために使用できるスレッドExecutors.newFixedThreadPoolの数を、必要なスレッド数で呼び出すことによって制限できます。

于 2013-08-08T06:27:30.977 に答える