5

1つのプロデューサースレッドと複数のコンシューマースレッドを持つマルチスレッドアプリケーションがあります。データは共有スレッドセーフコレクションに保存され、バッファに十分なデータがある場合はデータベースにフラッシュされます。

javadocsから-

BlockingQueue<E>

要素を取得するときにキューが空でなくなるのを待ち、要素を格納するときにキューでスペースが使用可能になるのを待つ操作を追加でサポートするキュー。

take()

このキューの先頭を取得して削除し、必要に応じて要素が使用可能になるまで待機します。

私の質問-

  1. E [] take(int n)メソッドを持つ別のコレクションはありますか?つまり、ブロッキングキューは、要素が使用可能になるまで待機します。私が欲しいのは、100個または200個の要素が使用可能になるまで待機する必要があるということです。
  2. または、ポーリングせずに問題に対処するために使用できる別の方法はありますか?
4

4 に答える 4

2

このdrainTo方法はあなたが探しているものとは正確には一致しませんが、それはあなたの目的に役立ちますか?

http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html#drainTo(java.util.Collection、int

編集

takeとの組み合わせを使用して、わずかにパフォーマンスの高いバッチブロッキングtakeminを実装できますdrainTo

public <E> void drainTo(final BlockingQueue<E> queue, final List<E> list, final int min) throws InterruptedException
{
  int drained = 0;
  do 
  {
    if (queue.size() > 0)
      drained += queue.drainTo(list, min - drained);
    else
    {
      list.add(queue.take());
      drained++;
    }
  }
  while (drained < min);
}
于 2012-06-07T10:08:27.463 に答える
2

BlockingQueue唯一の方法は、以下を使用して、実装を拡張するか、ある種のユーティリティメソッドを作成することだと思いますtake

public <E> void take(BlockingQueue<E> queue, List<E> to, int max) 
        throws InterruptedException {

    for (int i = 0; i < max; i++)
        to.add(queue.take());
}
于 2012-06-07T10:16:24.610 に答える
1

標準ライブラリにtypeメソッドを持つ同様のクラスがあるかどうかはわかりませんが、デフォルトをラップして、それほど面倒なことなくその関数を追加take(int n)できるはずですよね。BlockingQueue

別のシナリオは、コレクションに要素を配置するアクションをトリガーすることです。ここで、ユーザーが設定したしきい値がフラッシュをトリガーします。

于 2012-06-07T10:08:22.050 に答える
1

したがって、これは、任意の数の要素の取得をブロックできるスレッドセーフキューである必要があります。スレッデッドコードが正しいことを確認するためのより多くの目が歓迎されます。

package mybq;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

public class ChunkyBlockingQueue<T> {
    protected final LinkedList<T> q = new LinkedList<T>();
    protected final Object lock = new Object();

    public void add(T t) {
        synchronized (lock) {
            q.add(t);
            lock.notifyAll();
        }
    }

    public List<T> take(int numElements) {
        synchronized (lock) {
            while (q.size() < numElements) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            ArrayList<T> l = new ArrayList<T>(numElements);
            l.addAll(q.subList(0, numElements));
            q.subList(0, numElements).clear();
            return l;
        }
    }
}
于 2012-06-07T10:18:19.667 に答える