21

キュー内のオブジェクトをバッチ処理できる必要があることを除いて、JavaBlockingQueueと同じデータ構造に興味があります。つまり、プロデューサーがオブジェクトをキューに入れることができるようにしたいのですが、take()キューが特定のサイズ(バッチサイズ)に達するまでコンシューマーブロックをオンにします。

次に、キューがバッチサイズに達すると、プロデューサーはput()、コンシューマーがキュー内のすべての要素を消費するまでブロックする必要があります(この場合、プロデューサーは再び生産を開始し、コンシューマーはバッチに再び達するまでブロックします)。

同様のデータ構造が存在しますか?または、それを書く必要があります(私は気にしません)、何かがそこにある場合、私は自分の時間を無駄にしたくありません。


アップデート

多分少し物事を明確にするために:

状況は常に次のようになります。複数のプロデューサーがキューにアイテムを追加することもできますが、キューからアイテムを取得するコンシューマーが複数存在することはありません。

さて、問題は、これらのセットアップが並列および直列に複数あることです。つまり、プロデューサーは複数のキューのアイテムを作成しますが、コンシューマー自体もプロデューサーになることができます。これは、生産者、消費者-生産者、そして最終的には消費者の有向グラフとしてより簡単に考えることができます。

キューが空になるまでプロデューサーがブロックする必要がある理由(@Peter Lawrey)は、これらのそれぞれがスレッドで実行されるためです。スペースが利用可能になったときに単純に生成するためにそれらを残すと、一度に処理しようとするスレッドが多すぎるという状況になります。

たぶんこれを実行サービスと組み合わせると問題が解決するでしょうか?

4

5 に答える 5

16

BlockingQueue.drainTo(Collection、int)を使用することをお勧めします。take()とともに使用して、最小数の要素を確実に取得できます。

このアプローチを使用する利点は、バッチサイズがワークロードに応じて動的に増加し、コンシューマーがビジー状態のときにプロデューサーがブロックする必要がないことです。つまり、レイテンシとスループットを自己最適化します。


要求どおりに正確に実装するには(これは悪い考えだと思います)、ビジー状態のスレッドでSynchronousQueueを使用できます。

つまり、消費スレッドは

 list.clear();
 while(list.size() < required) list.add(queue.take());
 // process list.

消費者が忙しいときはいつでも、プロデューサーはブロックします。

于 2012-02-27T14:45:35.847 に答える
2

これは、リクエストに適していると思われる簡単な(=単純ですが、完全にはテストされていない)実装です。必要に応じて、フルキューインターフェイスをサポートするように拡張できるはずです。

パフォーマンスを向上させるために、「synchronized」キーワードを使用する代わりにReentrantLockに切り替えることができます。

public class BatchBlockingQueue<T> {

    private ArrayList<T> queue;
    private Semaphore readerLock;
    private Semaphore writerLock;
    private int batchSize;

    public BatchBlockingQueue(int batchSize) {
        this.queue = new ArrayList<>(batchSize);
        this.readerLock = new Semaphore(0);
        this.writerLock = new Semaphore(batchSize);
        this.batchSize = batchSize;
    }

    public synchronized void put(T e) throws InterruptedException {
        writerLock.acquire();
        queue.add(e);
        if (queue.size() == batchSize) {
            readerLock.release(batchSize);
        }
    }

    public synchronized T poll() throws InterruptedException {
        readerLock.acquire();
        T ret = queue.remove(0);
        if (queue.isEmpty()) {
            writerLock.release(batchSize);
        }
        return ret;
    }

}

お役に立てば幸いです。

于 2012-07-04T22:42:40.627 に答える
1

私が知っていることではありません。私が正しく理解している場合は、プロデューサーがキューをいっぱいにするまで(コンシューマーがブロックされている間)動作するか、コンシューマーがキューをクリアするまで(プロデューサーがブロックしている間)動作するようにします。その場合は、データ構造は必要ありませんが、一方のパーティがミューテックスファションで動作しているときに一方のパーティをブロックするメカニズムが必要になることをお勧めします。そのためにオブジェクトをロックし、内部的に完全か空かというロジックを使用して、ロックを解除して相手に渡すことができます。つまり、自分で書く必要があります:)

于 2012-02-27T14:38:53.920 に答える
1

これは、RingBufferがLMAXディスラプターパターンでどのように機能するかと同じように聞こえます。詳細については、 http://code.google.com/p/disruptor/を参照してください。

非常に大まかな説明は、主なデータ構造がRingBufferであるということです。プロデューサーはデータを順番にリングバッファーに入れ、コンシューマーはプロデューサーがバッファーに入れたのと同じ量のデータをプルオフできます(つまり、基本的にバッチ処理)。バッファーがいっぱいの場合、プロデューサーは、コンシューマーが終了してバッファー内のスロットを解放するまでブロックします。

于 2012-07-04T21:50:11.133 に答える
1

私は最近、キュー要素がバッチサイズに達しない場合に、フラッシュタイムアウトを使用してBlockingQueue要素をバッチ処理するこのユーティリティを開発しました。また、複数のインスタンスを使用して同じデータセットを作成するfanOutパターンもサポートしています。

// Instantiate the registry
FQueueRegistry registry = new FQueueRegistry();

// Build FQueue consumer
registry.buildFQueue(String.class)
                .batch()
                .withChunkSize(5)
                .withFlushTimeout(1)
                .withFlushTimeUnit(TimeUnit.SECONDS)
                .done()
                .consume(() -> (broadcaster, elms) -> System.out.println("elms batched are: "+elms.size()));

// Push data into queue
for(int i = 0; i < 10; i++){
        registry.sendBroadcast("Sample"+i);
}

詳細はこちら!

https://github.com/fulmicotone/io.fulmicotone.fqueue

于 2020-04-20T14:44:58.077 に答える