8

BoundedExecutor『Java Concurrency in Practice』の実装には奇妙なことがあります。

エグゼキュータでキューに入れられているか実行されているスレッドが十分にある場合、送信スレッドをブロックすることにより、エグゼキュータへのタスク送信を抑制することになっています。

これは実装です(catch句に欠落している再スローを追加した後):

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command) throws InterruptedException, RejectedExecutionException {
        semaphore.acquire();

        try {
            exec.execute(new Runnable() {
                @Override public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
            throw e;
        }
    }

とを4の範囲でインスタンス化BoundedExecutorするとExecutors.newCachedThreadPool()、キャッシュされたスレッドプールによってインスタンス化されるスレッドの数が4を超えることはないと予想されますが、実際にはそうなります。私はこの小さなテストプログラムを入手して、最大11個のスレッドを作成しました。

public static void main(String[] args) throws Exception {
    class CountingThreadFactory implements ThreadFactory {
        int count;

        @Override public Thread newThread(Runnable r) {
            ++count;
            return new Thread(r);
        }           
    }

    List<Integer> counts = new ArrayList<Integer>();

    for (int n = 0; n < 100; ++n) {
        CountingThreadFactory countingThreadFactory = new CountingThreadFactory();
        ExecutorService exec = Executors.newCachedThreadPool(countingThreadFactory);

        try {
            BoundedExecutor be = new BoundedExecutor(exec, 4);

            for (int i = 0; i < 20000; ++i) {
                be.submitTask(new Runnable() {
                    @Override public void run() {}
                });
            }
        } finally {
            exec.shutdown();
        }

        counts.add(countingThreadFactory.count);
    }

    System.out.println(Collections.max(counts));
}

セマフォのリリースとタスクの終了の間には、リリーススレッドがまだ終了していないときに別のスレッドが許可を取得してタスクを送信できる、ごくわずかな時間枠があると思います。つまり、競合状態があります。

誰かがこれを確認できますか?

4

3 に答える 3

10

BoundedExecutor は、実際には、スレッド プール サイズに制限を設ける方法としてではなく、タスクの送信を調整する方法を示すことを目的としていました。少なくとも 1 つのコメントが指摘しているように、後者を達成するためのより直接的な方法があります。

しかし、他の回答では、無​​制限のキューを使用し、

セマフォは現在実行中のタスクと実行を待っているタスクの数を制限しているため、セマフォの境界をプール サイズに許可するキューに入れられたタスクの数を加えた値に設定します。[JCiP、セクション 8.3.3 の終わり]

制限のないキューとプール サイズに言及することで、(あまり明確ではないように見えますが) 制限のあるサイズのスレッド プールを使用することを暗示していました。

しかし、BoundedExecutor について私がいつも悩まされているのは、それが ExecutorService インターフェースを実装していないことです。同様の機能を実現し、標準インターフェースを実装する最新の方法は、Guava のlistenDecoratorメソッドとForwardingListeningExecutorServiceクラスを使用することです。

于 2012-04-11T15:51:41.580 に答える
5

競合状態の分析は正しいです。ExecutorService と Semaphore の間の同期の保証はありません。

ただし、スレッドの数を調整することが BoundedExecutor の目的であるかどうかはわかりません。サービスに送信されるタスクの数を調整するためのものだと思います。送信する必要がある 500 万のタスクがあり、そのうちの 10,000 を超えるタスクを送信すると、メモリ不足になると想像してください。

いつでも実行できるスレッドは 4 つだけですが、なぜ 500 万のタスクすべてをキューに入れようとするのでしょうか? これと同様の構成を使用して、任意の時点でキューに入れられるタスクの数を調整できます。ここから得られるべきことは、常に 4 つのタスクしか実行されていないということです。

明らかに、これに対する解決策は を使用することExecutors.newFixedThreadPool(4)です。

于 2012-04-10T18:18:04.597 に答える
2

一度に 9 個のスレッドが作成されていることがわかります。必要以上のスレッドが発生する競合状態があると思われます。

これは、タスクの実行前と実行後に行う作業があるためである可能性があります。これは、コード ブロック内に 4 つのスレッドしかない場合でも、前のタスクを停止したり、新しいタスクを開始する準備をしているスレッドが多数あることを意味します。

つまり、スレッドは実行中に release() を実行します。それはあなたが最後に行うことですが、新しいタスクを取得する前に行う最後のことではありません。

于 2012-04-10T18:22:07.630 に答える