アプリケーションに遅延キューを実装しようとしています。
アイテムをキューに入れる遅延プロデューサーを次に示します。
package com.pra.delayed.queue;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class DelayedProducer {
private static Queue<Delayed> queue;
public DelayedProducer(Queue<Delayed> passedQueue) {
queue = passedQueue;
}
public void putDelayedObj(String xmlMessage, long time) throws InterruptedException{
queue.offer(new DelayedDataObject(xmlMessage, time));
}
public static void main(String[] args) {
Queue<Delayed> localQueue = new DelayQueue<Delayed>();
DelayedProducer delayedProducer = new DelayedProducer(localQueue);
try {
int i=0;
ExecutorService es = Executors.newFixedThreadPool(500);
for(int iTh=0; iTh<300; iTh++){
Runnable consumer = new DelayedConsumer(localQueue);
es.execute(consumer);
}
Thread.sleep(10*1000);
long ttt = System.currentTimeMillis();
while(i<=100000){
delayedProducer.putDelayedObj(i+"", ttt);
i++;
}
es.shutdown();
es.awaitTermination(10,TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Runnable オブジェクトであるコンシューマーを次に示します。
package com.pra.delayed.queue;
import java.util.Queue;
import java.util.concurrent.Delayed;
public class DelayedConsumer implements Runnable{
private static Queue<Delayed> queue;
public DelayedConsumer(Queue<Delayed> passedQueue) {
queue = passedQueue;
}
@Override
public void run() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
DelayedDataObject ddo = (DelayedDataObject)queue.poll();
if(ddo != null){
System.out.println(queue.size());
}
}
}
100000 個のオブジェクトを入れていますが、300 個のオブジェクトしか取得できません。
実装を修正するのを手伝ってください。コードにある間違いを説明してください。