1

take()どのように機能し、キューにプッシュされた「高速」要素を消費するのに適した方法であるかどうかを理解したいと思います。

それがどのように機能するかを理解するために、ここではオブザーバーパターンを考慮していないことに注意してください。そのパターンを使用してイベントに「迅速に反応」できることは知っていますが、それは私の質問の目的ではありません.

たとえば、BlockingQueue(ほとんどが空の) スレッドがあり、要素がそのキューにプッシュされて消費されるのを待っているスレッドが「スタック」している場合、その間に費やされる時間を最小限に抑える (レイテンシを減らす) 良い方法は何でしょうか?要素がキューにプッシュされた瞬間とそれが消費された瞬間?

たとえば、これを行うスレッドの違いは何ですか:

while( true ) {
   elem = queue.peek();
   if ( elem == null ) {
       Thread.sleep( 25 ); // prevents busy-looping
   } else {
   ... // do something here
   }
}

そして、これを行う別の人:

while ( true ) {
    elem = queue.take();
    ... // do something with elem here
}

(単純化するために、ここで例外について議論することは無視できると思います!?)

電話をかけたときにtake()キューが空になったとき、内部では何が行われているのでしょうか? JVM は、キューに何かがあるかどうかを常にチェックするビジー ループを行うことができないため、内部でスレッドを「スリープ」する必要がありますか? take()は内部で CAS 操作を使用していますか? もしそうなら、take()がその CAS 操作を呼び出す頻度を決定するものは何ですか?

何かが突然キューに入れられたらどうしますか? そのスレッドは、take()どうにかしてブロックされ、すぐに行動する必要があることを「通知」されますか?

最後に、アプリケーションの存続期間中、BlockingQueue のtake()で1 つのスレッドが「スタック」するのは「一般的」ですか?

これはすべて、ブロッキングtake()がどのように機能するかに関する 1 つの大きな疑問であり、さまざまな質問 (少なくとも意味のある質問) に答えることが、これらすべてをよりよく理解するのに役立つと思います。

4

5 に答える 5

1

内部的には、メソッドで通知される条件をtake待機します。つまり、待機中のスレッドはスリープ状態になり、これでウェイクアップし ます。これは高速である必要があります。notEmptyinsertinsert.

ArrayBlockingQueueや などの一部のブロッキング キューにSynchronousQueueは、キューの公平性プロパティを受け入れるコンストラクタがあります。渡すことでtrue、スレッドがスタックするのを防ぐ必要take,があります。(このパラメーターは、基になるReentrantLockが公平かどうかを指定します。)

于 2013-04-22T19:31:00.797 に答える
1

さて、これがの実装ですLinkedBlockingQueue<E>.take()

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
            while (count.get() == 0) {
                notEmpty.await();
            }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

キューが空の場合、notEmpty.await()が呼び出されます。

現在のスレッドに、シグナルが送信されるか中断されるまで待機させます。

この Condition に関連付けられたロックはアトミックに解放され、現在のスレッドはスレッド スケジューリングの目的で無効になり、次の 4 つのいずれかが発生するまで休止状態になります。

  1. 他のスレッドがこの Condition のシグナル メソッドを呼び出し、現在のスレッドがたまたま起動されるスレッドとして選択されます。また
  2. 他のスレッドがこの Condition の signalAll メソッドを呼び出します。また
  3. 他のスレッドが現在のスレッドに割り込んでおり、スレッド中断の中断がサポートされています。また
  4. 「偽のウェイクアップ」が発生します。

別のスレッドが何かをキューに入れると、 が呼び出さsignalれ、このキューからアイテムを消費するのを待っているスレッドの 1 つが目覚めます。peekこれは/sleepループよりも速く動作するはずです。

于 2013-04-22T19:31:24.157 に答える
0

違いは、最初のスレッドが最大 25 ミリ秒も長くスリープするのに対し、2 番目のスレッドはまったく時間を無駄にしないことです。

于 2013-04-22T23:15:59.603 に答える