6

Java は、バッチ処理を処理するためのキュー オブジェクトまたはメカニズムをサポートしていますか?

例:キュー(または任意のキューオブジェクト)があり、一部のプロデューサーがアイテムをキューに1つずつプッシュします。私のターゲットは、このキューに10個のアイテムまたは10個を超えるアイテムがある場合です。ハンドラーをトリガーして処理できます1バッチで。

または自動的にトリガーされない場合は、ハンドラー側でキューを適切にループする方法を見つける必要があります。

これを処理するための典型的で高性能なオブジェクトまたはライブラリはありますか?

ありがとう、エムレ

4

5 に答える 5

2

Queue でのバッチ処理は、待機/通知を使用して実現できます。リソースが利用可能かどうかにかかわらず、リソースに対するスレッド呼び出しをブロックするようなものです。

public class MyQueue implements Queue<Object>{
        public synchronized List<Object> peek() {
        if(this.list.size()>=10)
                  this.list.wait();
        return Collections.subList(0,10);
    }
        @Override
    public boolean add(Object e) {
        this.list.add(e);
                if(this.list.size()>=10)
                  this.list.notifyAll(); 
        return false;
    }
}

自動的にトリガーされません

その場合、指定したタイムアウトで待機を呼び出すことができます。

于 2012-10-08T09:17:44.133 に答える
1

を使用BlockingQueue.drainTo()して、実行するタスクのバッチを自動的に取得できます。これは、1秒あたり100Kを超えるタスクに適しています。

より高性能なキューイングが必要な場合は、より複雑なDisruptorまたはJava Chronicleを使用できます。これらは、自動バッチ処理をサポートする、毎秒数百万のタスクをキューに入れることができます。

于 2012-10-08T09:29:13.050 に答える
1

バックグラウンド スレッドを使用して、他のスレッドによってキューにプッシュされたオブジェクトを収集して処理する、バッチでオブジェクトを処理する簡単な試みを次に示します。

public abstract class Batcher<E> implements Runnable {

    public static interface BatchProcessor<E> {
        public void processBatch(List<E> batch);
    }

    private final BlockingQueue<E> queue;
    private final BatchProcessor<E> processor;

    private Batcher(BlockingQueue<E> queue, BatchProcessor<E> processor) {
        this.queue = queue;
        this.processor = processor;
    }

    @Override
    public void run() {
        try {
            while (true) {
                List<E> batch = new ArrayList<E>();
                for (int i = 0; i < 10; i++) {
                    batch.add(queue.take());
                }
                processor.processBatch(batch);
            }
        } catch (InterruptedException e) {
            return;
        }
    }

}

これを使用するには、 を作成してBlockingQueueその上にオブジェクトを配置し、 の実装のインスタンスをBatchProcessor作成してバッチを処理し、 のインスタンスを作成Batcherして前者から後者にオブジェクトを送り込みます。

于 2012-10-08T10:37:20.207 に答える
0

java.util.Queueいくつかの実装がある interface の API ドキュメントを見てください。

異なるプロセス間でメッセージを交換するためのキューイング システムを処理するための標準 API であるJava Message Service (JMS)もあります。

于 2012-10-08T08:55:36.063 に答える
0

CountDownLatchが必要なものか、おそらくCyclicBarrierだと思います。これにより、一定数の操作が発生した後にコンシューマーをトリガーする同期ポイントをセットアップでき、標準キューをコンテナー オブジェクトとして使用できます。

于 2012-10-08T09:00:58.917 に答える