26

BlockingQueueaが空になるまでブロックする方法を探しています。

マルチスレッド環境では、プロデューサーがアイテムを に入れている限りBlockingQueue、キューが空になり、数ナノ秒後にアイテムでいっぱいになる状況が発生する可能性があることを私は知っています。

ただし、プロデューサーが1 つしかない場合は、アイテムをキューに入れるのを停止した後、キューが空になるまで待機 (およびブロック) する必要がある場合があります。

Java/疑似コード:

// Producer code
BlockingQueue queue = new BlockingQueue();

while (having some tasks to do) {
    queue.put(task);
}

queue.waitUntilEmpty(); // <-- how to do this?

print("Done");

何か考えはありますか?

EDIT:ラップBlockingQueueして追加の条件を使用するとうまくいくことはわかっています。事前に作成されたソリューションやより良い代替案があるかどうかを尋ねているだけです。

4

6 に答える 6

21

wait()と を使用した簡単なソリューションnotify():

// Producer:
synchronized(queue) {
    while (!queue.isEmpty())
        queue.wait(); //wait for the queue to become empty
    queue.put();
}

//Consumer:
synchronized(queue) {
    queue.get();
    if (queue.isEmpty())
        queue.notify(); // notify the producer
}
于 2013-03-03T11:06:02.257 に答える
7

すでに多数のスレッドがアクティブにポーリングまたはキューを取得している可能性があることは理解していますが、フロー/設計についてはまだ正しくないと感じています。

キューが空になったからといって、以前に追加されたタスクが終了したわけではありません。一部のアイテムは処理に時間がかかる可能性があるため、空であることを確認することはあまり役に立ちません。

したがって、 を忘れて、BlockingQueue他のコレクションとして使用できます。アイテムを に変換し、CollectionsCallable利用しExecutorService.invokeAll()ます。

    Collection<Item> queue = ...
    Collection<Callable<Result>> tasks = new ArrayList<Callable<Result>>();

    for (Item item : queue) {
        tasks.add(new Callable<Result>() {

            @Override
            public Result call() throws Exception {
                // process the item ...

                return result;
            }
        });
    }

    // look at the results, add timeout for invokeAll if necessary
    List<Future<Result>> results = executorService.invokeAll(tasks);

    // done

このアプローチにより、プロデューサーが待機できる時間と適切な例外処理を完全に制御できます。

于 2013-07-11T17:43:52.010 に答える
4

やりたいこととはまったく異なりますが、を使用するSynchronousQueueと、Java/疑似コードと非常によく似た効果が得られます。つまり、消費者がすべてのデータを取得するまで生産者がブロックされます。

唯一の違いは、最後に一度だけではなく、コンシューマーがデータを取得するようになるまで、各プットでプロデューサーがブロックすることです。それがあなたのケースに違いをもたらすかどうかはわかりません。プロデューサーによって実行されるタスクが多少高価である場合、顕著な違いが生じるだけだと思います。

于 2013-03-03T11:12:48.630 に答える
2

ほとんどの場合、キューがいっぱいになったときにのみプロデューサーをブロックし、空になるまで待機しないため、ユースケースは非常に特別なものにする必要があります。

とにかく、これは実行可能です。isEmptyプロデューサーがローカルでスピンする、つまり、バスを叩くのではなく、独自のキャッシュにアクセスするため、true が返されるまでスピンすることはそれほど非効率的ではないと思います。ただし、スレッドがスケジュール可能なままであるため、CPU 時間が消費されます。しかし、ローカルスピニングは間違いなく簡単な方法です. それ以外の場合は、次の 2 つのオプションが表示されます。

  1. @niculare のようにwait+を使用することをお勧めしますnotify
  2. どういうわけか、ロックのない方法でプロデューサーに通知するために、キューが空であることを認識した最初のコンシューマーを作成します。これは遅くなりますが、「より」優雅に劣化します
于 2013-03-03T11:29:45.953 に答える
1

@niculareの答えはOKのようですが、@hankduanのコメントをチェックしてください。

同様の問題の解決策を探しているときに、この質問を見ました。
最後に、多かれ少なかれ書き直しLinkedBlockingQueueました。メソッドのサブセットがあり、 or
を実装していません。 複数のプロデューサーとコンシューマーを管理します。 私にとってはうまくいきます。CollectionIterable

/**********************************************************************************************************************
 * Import specifications
 *********************************************************************************************************************/
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.*;

/**********************************************************************************************************************
 * This class implements a completely reentrant FIFO.
 *********************************************************************************************************************/
public class BlockingFIFO<E>
{
  /********************************************************************************************************************
   * The constructor creates an empty FIFO with a capacity of {@link Integer#MAX_VALUE}.
   *******************************************************************************************************************/
  public BlockingFIFO()
  {
    // -----------------
    // Initialize object
    // -----------------
    this(Integer.MAX_VALUE);

  } // constructor

  /********************************************************************************************************************
   * The constructor creates an empty FIFO with the specified capacity.
   *
   * @param capacity_ipar The maximum number of elements the FIFO may contain.
   *******************************************************************************************************************/
  public BlockingFIFO(int capacity_ipar)
  {
    // ---------------------
    // Initialize attributes
    // ---------------------
    lock_attr = new ReentrantLock();
    not_empty_attr = lock_attr.newCondition();
    not_full_attr = lock_attr.newCondition();
    head_attr = null;
    tail_attr = null;
    capacity_attr = capacity_ipar;
    size_attr = 0;

  } // constructor

  /********************************************************************************************************************
   * This method removes all of the elements from the FIFO.
   *
   * @return The number of elements in the FIFO before it was cleared.
   *******************************************************************************************************************/
  public int clear()
  {
    // -----------------
    // Initialize result
    // -----------------
    int result;
    result = 0;

    // ----------
    // Clear FIFO
    // ----------
    lock_attr.lock();
    try
    {
      result = size_attr;
      head_attr = null;
      tail_attr = null;
      size_attr = 0;
      not_full_attr.signalAll();
    }
    finally
    {
      lock_attr.unlock();
    }

    // ----
    // Done
    // ----
    return (result);

  } // clear

  /********************************************************************************************************************
   * This method returns the number of elements in the FIFO.
   *
   * @return The number of elements in the FIFO.
   *******************************************************************************************************************/
  public int size()
  {
    // -----------
    // Return size
    // -----------
    lock_attr.lock();
    try
    {
      return (size_attr);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // size

  /********************************************************************************************************************
   * This method returns the number of additional elements that the FIFO can ideally accept without blocking.
   *
   * @return The remaining capacity the FIFO.
   *******************************************************************************************************************/
  public int remainingCapacity()
  {
    // -------------------------
    // Return remaining capacity
    // -------------------------
    lock_attr.lock();
    try
    {
      return (capacity_attr - size_attr);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // remainingCapacity

  /********************************************************************************************************************
   * This method waits for the FIFO to become empty.
   *
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for the FIFO to become
   *                              empty.
   *******************************************************************************************************************/
  public void waitEmpty()
    throws InterruptedException
  {
    // -----------------------------
    // Wait for FIFO to become empty
    // -----------------------------
    lock_attr.lock();
    try
    {
      while (size_attr > 0)
        not_full_attr.await();
    }
    finally
    {
      lock_attr.unlock();
    }

  } // waitEmpty

  /********************************************************************************************************************
   * This method waits at most the specified time for the FIFO to become empty.
   * <br>It returns <code>true</code> if the FIFO is empty and <code>false</code> otherwise.
   *
   * @param  timeout_ipar         The maximum number of milliseconds to wait for the FIFO to become empty.
   * @return                      True if and only if the FIFO is empty.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for the FIFO to become
   *                              empty.
   *******************************************************************************************************************/
  public boolean waitEmpty(long timeout_ipar)
    throws InterruptedException
  {
    // ------------------
    // Determine deadline
    // ------------------
    Date deadline;
    deadline = new Date(System.currentTimeMillis() + timeout_ipar);

    // -----------------------------
    // Wait for FIFO to become empty
    // -----------------------------
    lock_attr.lock();
    try
    {
      while (size_attr > 0)
      {
        if (!not_full_attr.awaitUntil(deadline))
          return (false);
      }
      return (true);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // waitEmpty

  /********************************************************************************************************************
   * This method waits at most the specified time for the FIFO to become empty.
   * <br>It returns <code>true</code> if the FIFO is empty and <code>false</code> otherwise.
   *
   * @param  timeout_ipar         The maximum time to wait for the FIFO to become empty.
   * @param  unit_ipar            The unit of the specified timeout.
   * @return                      True if and only if the FIFO is empty.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for the FIFO to become
   *                              empty.
   *******************************************************************************************************************/
  public boolean waitEmpty(long    timeout_ipar,
                           TimeUnit unit_ipar)
    throws InterruptedException
  {
    // -----------------------------
    // Wait for FIFO to become empty
    // -----------------------------
    return (waitEmpty(unit_ipar.toMillis(timeout_ipar)));

  } // waitEmpty

  /********************************************************************************************************************
   * This method adds the specified element at the end of the FIFO if it is possible to do so immediately without
   * exceeding the queue's capacity.
   * <br>It returns <code>true</code> upon success and <code>false</code> if this queue is full.
   *
   * @param  element_ipar The element to add to the FIFO.
   * @return              True if and only if the element was added to the FIFO.
   *******************************************************************************************************************/
  public boolean offer(E element_ipar)
  {
    // ----------------------
    // Try to add the element
    // ----------------------
    lock_attr.lock();
    try
    {
      if (capacity_attr > size_attr)
      {
        push(element_ipar);
        return (true);
      }
      else
        return (false);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // offer

  /********************************************************************************************************************
   * This method adds the specified element at the end of the FIFO, waiting if necessary up to the specified wait time
   * for space to become available.
   * <br>It returns <code>true</code> upon success and <code>false</code> if this queue is full.
   *
   * @param  element_ipar         The element to add to the FIFO.
   * @param  timeout_ipar         The maximum number of milliseconds to wait for space to become available.
   * @return                      True if and only if the element was added to the FIFO.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for space to become
   *                              available.
   *******************************************************************************************************************/
  public boolean offer(E    element_ipar,
                       long timeout_ipar)
    throws InterruptedException
  {
    // ------------------
    // Determine deadline
    // ------------------
    Date deadline;
    deadline = new Date(System.currentTimeMillis() + timeout_ipar);

    // ----------------------
    // Try to add the element
    // ----------------------
    lock_attr.lock();
    try
    {
      while (size_attr == capacity_attr)
      {
        if (!not_full_attr.awaitUntil(deadline))
          return (false);
      }
      push(element_ipar);
      return (true);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // offer

  /********************************************************************************************************************
   * This method adds the specified element at the end of the FIFO, waiting if necessary up to the specified wait time
   * for space to become available.
   * <br>It returns <code>true</code> upon success and <code>false</code> if this queue is full.
   *
   * @param  element_ipar         The element to add to the FIFO.
   * @param  timeout_ipar         The maximum time to wait for space to become available.
   * @param  unit_ipar            The unit of the specified timeout.
   * @return                      True if and only if the element was added to the FIFO.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for space to become
   *                              available.
   *******************************************************************************************************************/
  public boolean offer(E        element_ipar,
                       long     timeout_ipar,
                       TimeUnit unit_ipar)
    throws InterruptedException
  {
    // ----------------------------
    // Try to add specified element
    // ----------------------------
    return (offer(element_ipar, unit_ipar.toMillis(timeout_ipar)));

  } // offer

  /********************************************************************************************************************
   * This method adds the specified element at the end of the FIFO, waiting if necessary for space to become available.
   *
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for space to become
   *                              available.
   *******************************************************************************************************************/
  public void put(E element_ipar)
    throws InterruptedException
  {
    // ----------------------
    // Try to add the element
    // ----------------------
    lock_attr.lock();
    try
    {
      while (size_attr == capacity_attr)
        not_full_attr.await();
      push(element_ipar);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // put

  /********************************************************************************************************************
   * This method retrieves, but does not remove, the head of the FIFO, or returns <code>null</code> if the FIFO is
   * empty.
   *
   * @return The head of the FIFO, or <code>null</code> if the FIFO is empty.
   *******************************************************************************************************************/
  public E peek()
  {
    // --------------------
    // Return first element
    // --------------------
    lock_attr.lock();
    try
    {
      if (size_attr == 0)
        return (null);
      else
        return (head_attr.contents);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // peek

  /********************************************************************************************************************
   * This method retrieves and removes the head of the FIFO, or returns <code>null</code> if the FIFO is
   * empty.
   *
   * @return The head of the FIFO, or <code>null</code> if the FIFO is empty.
   *******************************************************************************************************************/
  public E poll()
  {
    // --------------------
    // Return first element
    // --------------------
    lock_attr.lock();
    try
    {
      if (size_attr == 0)
        return (null);
      else
        return (pop());
    }
    finally
    {
      lock_attr.unlock();
    }

  } // poll

  /********************************************************************************************************************
   * This method retrieves and removes the head of the FIFO, waiting up to the specified wait time if necessary for an
   * element to become available.
   * <br>It returns <code>null</code> if the specified waiting time elapses before an element is available.
   *
   * @param  timeout_ipar         The maximum number of milliseconds to wait for an element to become available.
   * @return                      The head of the FIFO, or <code>null</code> if the specified waiting time elapses
   *                              before an element is available.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for an element to become
   *                              available.
   *******************************************************************************************************************/
  public E poll(long timeout_ipar)
    throws InterruptedException
  {
    // ------------------
    // Determine deadline
    // ------------------
    Date deadline;
    deadline = new Date(System.currentTimeMillis() + timeout_ipar);

    // --------------------
    // Return first element
    // --------------------
    lock_attr.lock();
    try
    {
      while (size_attr == 0)
      {
        if (!not_empty_attr.awaitUntil(deadline))
          return (null);
      }
      return (pop());
    }
    finally
    {
      lock_attr.unlock();
    }

  } // poll

  /********************************************************************************************************************
   * This method retrieves and removes the head of the FIFO, waiting up to the specified wait time if necessary for an
   * element to become available.
   * <br>It returns <code>null</code> if the specified waiting time elapses before an element is available.
   *
   * @param  timeout_ipar         The maximum time to wait for an element to become available.
   * @param  unit_ipar            The unit of the specified timeout.
   * @return                      The head of the FIFO, or <code>null</code> if the specified waiting time elapses
   *                              before an element is available.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for an element to become
   *                              available.
   *******************************************************************************************************************/
  public E poll(long     timeout_ipar,
                TimeUnit unit_ipar)
    throws InterruptedException
  {
    // ------------------------
    // Try to get first element
    // ------------------------
    return (poll(unit_ipar.toMillis(timeout_ipar)));

  } // poll

  /********************************************************************************************************************
   * This method retrieves and removes the head of the FIFO, waiting if necessary for an element to become available.
   *
   * @return                      The head of the FIFO.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for space to become
   *                              available.
   *******************************************************************************************************************/
  public E take()
    throws InterruptedException
  {
    // ---------------------------
    // Try to return first element
    // ---------------------------
    lock_attr.lock();
    try
    {
      while (size_attr == 0)
        not_empty_attr.await();
      return (pop());
    }
    finally
    {
      lock_attr.unlock();
    }

  } // take

  /********************************************************************************************************************
   * This class implements as node within the FIFO.
   *******************************************************************************************************************/
  private class Node
  {
    E    contents;
    Node next;

  } // class Node

  /********************************************************************************************************************
   * This method adds the specified element to the end of the FIFO.
   * <br>It sends a signal to all threads waiting for the FIFO to contain something.
   * <br>The caller should have locked the object and have made sure the list is not full.
   *******************************************************************************************************************/
  private void push(E element_ipar)
  {
    // -----------
    // Create node
    // -----------
    Node node;
    node = new Node();
    node.contents = element_ipar;
    node.next = null;

    // --------------
    // Add to the end
    // --------------
    if (head_attr == null)
      head_attr = node;
    else
      tail_attr.next = node;
    tail_attr = node;

    // ----------------------
    // We got another element
    // ----------------------
    size_attr++;
    not_empty_attr.signalAll();

  } // push

  /********************************************************************************************************************
   * This method removes the first element from the FIFO and returns it.
   * <br>It sends a signal to all threads waiting for the FIFO to have space available.
   * <br>The caller should have locked the object and have made sure the list is not empty.
   *******************************************************************************************************************/
  private E pop()
  {
    // ------------
    // Isolate node
    // ------------
    Node node;
    node = head_attr;
    head_attr = node.next;
    if (head_attr == null)
      tail_attr = null;

    // --------------------------
    // We removed another element
    // --------------------------
    size_attr--;
    not_full_attr.signalAll();

    // ----
    // Done
    // ----
    return (node.contents);

  } // pop

  /********************************************************************************************************************
   * This attribute represents the lock on the FIFO.
   *******************************************************************************************************************/
  private Lock lock_attr;

  /********************************************************************************************************************
   * This attribute represents the condition of the FIFO not being empty.
   *******************************************************************************************************************/
  private Condition not_empty_attr;

  /********************************************************************************************************************
   * This attribute represents the condition of the FIFO not being full.
   *******************************************************************************************************************/
  private Condition not_full_attr;

  /********************************************************************************************************************
   * This attribute represents the first element of the FIFO.
   *******************************************************************************************************************/
  private Node head_attr;

  /********************************************************************************************************************
   * This attribute represents the last element of the FIFO.
   *******************************************************************************************************************/
  private Node tail_attr;

  /********************************************************************************************************************
   * This attribute represents the capacity of the FIFO.
   *******************************************************************************************************************/
  private int capacity_attr;

  /********************************************************************************************************************
   * This attribute represents the size of the FIFO.
   *******************************************************************************************************************/
  private int size_attr;

} // class BlockingFIFO
于 2018-04-17T14:22:25.753 に答える
-1

documentationによると、 BlockingQueue はメソッドによって「削除」でブロックできます:take()またはpoll(time, unit)代わりにpoll()

于 2016-06-20T10:41:13.913 に答える