36

これらのBlockingQueueの実装を見て、それらの違いを理解できません。これまでの私の結論:

  1. SynchronousQueueは必要ありません
  2. LinkedBlockingQueueは FIFO を保証します。BlockingQueueを FIFO にするには、パラメーター true で作成する必要があります
  3. SynchronousQueueは、ほとんどのコレクション メソッドを中断します (contains、size など)。

では、いつSynchronousQueueが必要になるのでしょうか? この実装のパフォーマンスはLinkedBlockingQueueより優れていますか?

さらに複雑にするために...なぜExecutors.newCachedThreadPoolは SynchronousQueue を使用するのに、他のもの ( Executors.newSingleThreadExecutorおよびExecutors.newFixedThreadPool ) は LinkedBlockingQueue を使用するのでしょうか?

編集

最初の疑問は解決しました。しかし、他の人 ( Executors.newSingleThreadExecutorExecutors.newFixedThreadPool ) が LinkedBlockingQueue を使用しているときに、 Executors.newCachedThreadPoolが SynchronousQueue を使用する理由はまだわかりません。

私が得たのは、SynchronousQueue を使用すると、空きスレッドがない場合、プロデューサーがブロックされるということです。ただし、スレッドの数は事実上無制限であるため (必要に応じて新しいスレッドが作成されます)、これは決して起こりません。では、なぜ SynchronousQueue を使用する必要があるのでしょうか。

4

3 に答える 3

56

SynchronousQueueは非常に特別な種類のキューですQueue。 .

したがって、特定のセマンティクスが必要な特別な場合にのみ必要になる場合があります

使用するもう 1 つの理由SynchronousQueueは、パフォーマンスです。の実装はSynchronousQueue非常に最適化されているようです。そのため、ランデブー ポイント以外のものが必要ない場合 ( の場合のようにExecutors.newCachedThreadPool()、コンシューマーが「オンデマンド」で作成されるため、キュー アイテムが蓄積されません)、次のことができます。を使用してパフォーマンスを向上させSynchronousQueueます。

単純な合成テストは、単純な単一の生産者と単一の消費者のシナリオで、デュアルコア マシンのスループットが、キューの長さ = 1 の場合SynchronousQueueのスループットの ~20 倍高いことを示しています。キューの長さが増加すると、スループットは上昇し、ほぼ. これは、他のキューと比較して、マルチコア マシンでの同期オーバーヘッドが低いことを意味します。しかし、繰り返しになりますが、これは、ランデブー ポイントが に偽装される必要がある特定の状況でのみ重要になります。LinkedBlockingQueueArrayBlockingQueueSynchronousQueueSynchronousQueueQueue

編集:

ここにテストがあります:

public class Test {
    static ExecutorService e = Executors.newFixedThreadPool(2);
    static int N = 1000000;

    public static void main(String[] args) throws Exception {    
        for (int i = 0; i < 10; i++) {
            int length = (i == 0) ? 1 : i * 5;
            System.out.print(length + "\t");
            System.out.print(doTest(new LinkedBlockingQueue<Integer>(length), N) + "\t");
            System.out.print(doTest(new ArrayBlockingQueue<Integer>(length), N) + "\t");
            System.out.print(doTest(new SynchronousQueue<Integer>(), N));
            System.out.println();
        }

        e.shutdown();
    }

    private static long doTest(final BlockingQueue<Integer> q, final int n) throws Exception {
        long t = System.nanoTime();

        e.submit(new Runnable() {
            public void run() {
                for (int i = 0; i < n; i++)
                    try { q.put(i); } catch (InterruptedException ex) {}
            }
        });    

        Long r = e.submit(new Callable<Long>() {
            public Long call() {
                long sum = 0;
                for (int i = 0; i < n; i++)
                    try { sum += q.take(); } catch (InterruptedException ex) {}
                return sum;
            }
        }).get();
        t = System.nanoTime() - t;

        return (long)(1000000000.0 * N / t); // Throughput, items/sec
    }
}    

そして、これが私のマシンでの結果です:

ここに画像の説明を入力

于 2011-02-24T09:23:40.213 に答える
5

現在、デフォルトExecutors(ThreadPoolExecutorベース) は、固定サイズの事前に作成されたスレッドのセットとBlockingQueueオーバーフロー用のサイズを使用するか、そのキューがいっぱいの場合 (およびその場合にのみ) に最大サイズまでスレッドを作成できます。

これにより、いくつかの驚くべき特性が導き出されます。たとえば、追加のスレッドはキューの容量に達した場合にのみ作成されるため、LinkedBlockingQueue(無制限の) a を使用すると、現在のプール サイズがゼロであっても、新しいスレッドが作成されないことを意味します。を使用するArrayBlockingQueue場合、新しいスレッドは満杯の場合にのみ作成され、プールがそれまでにスペースをクリアしていない場合、後続のジョブが拒否される可能性が十分にあります。

A のSynchronousQueue容量はゼロであるため、プロデューサーは、コンシューマーが使用可能になるかスレッドが作成されるまでブロックされます。これは、@axtavt によって生成された見栄えの良い数値にもかかわらず、キャッシュされたスレッド プールは一般に、プロデューサーの観点からは最悪のパフォーマンスを示すことを意味します。

残念ながら、現時点では、バーストまたはアクティビティ中に低い最小値から最大値までスレッドを作成する、妥協の実装の優れたライブラリ バージョンはありません。拡張可能なプールまたは固定プールのいずれかがあります。社内に 1 つありますが、まだ一般に公開する準備はできていません。

于 2011-02-24T21:18:17.273 に答える
4

キャッシュ スレッド プールは、必要に応じてスレッドを作成します。待機中のコンシューマーにタスクを渡すか失敗するキューが必要です。待機中のコンシューマがない場合は、新しいスレッドを作成します。SynchronousQueue は要素を保持せず、代わりに要素を渡すか失敗します。

于 2011-02-24T09:21:40.307 に答える