あなたのデザインに基づいて、私は以下のようなものを考えることができます。ConsumerTaskはジェネリックスを利用できますが、Producerスレッドで同じことを行う方法を理解するのに苦労しました。生産者と消費者の両方が、生産/消費されるアイテムの数に制限があります。TimerTaskロジックは、TimerTask自体のrun()メソッドでタイマーをキャンセルして停止するために不可欠です。この場合、シャットダウンに使用できるのはPOISONPILLアプローチのみです。Executors.newSingleThreadExecutor()またはscheduledThreadPoolExecutor()を使用する場合、shutdown()メソッドとshutdownNow()メソッドを使用して、プロデューサーまたはコンシューマーのいずれかを停止できます。TimerTaskは、ConcurrentQueueの動作を確認するための良い例ですが、実動システムでは使用されません。
編集
プロデューサースレッドに汎用機能を追加します。コンストラクターは、キューにアイテムを追加するメソッドを実装するテンプレートクラスを取得するようになりました。プロデューサーがキューにアイテムを追加するたびに呼び出されるaddItem()メソッドを含む抽象クラスPASSWORDを定義しました。
import java.util.Date;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class ConsumerTask<T> extends TimerTask {
Timer timer;
ConcurrentLinkedQueue<T> itemQueue;
AtomicLong count = new AtomicLong(0);
final long limit;
public ConsumerTask(ConcurrentLinkedQueue<T> itemQ, long lim, int seconds) {
limit = lim;
timer = new Timer();
timer.scheduleAtFixedRate(this, new Date(), seconds * 1000);
itemQueue = itemQ;
}
public void run() {
T item = itemQueue.peek();
if (item != null) {
if (count.incrementAndGet() <= limit) {
System.out.println("Extracting Item : " + itemQueue.poll());
} else {
System.out
.println("Consumed : " + (count.get() - 1) + " items");
timer.cancel();
}
}
}
public static void main(String args[]) throws InterruptedException {
ConcurrentLinkedQueue<Integer> itemQ = new ConcurrentLinkedQueue<Integer>();
ConsumerTask<Integer> ct = new ConsumerTask<Integer>(itemQ, 10, 1);
new Thread(new Producer<Integer>(itemQ, new IntegerAddItem(itemQ), 20))
.start();
new Thread(ct).start();
}
}
abstract class AddItem<T> {
ConcurrentLinkedQueue<T> itemQ;
T t;
public AddItem(ConcurrentLinkedQueue<T> itemQ) {
this.itemQ = itemQ;
}
abstract boolean addItem();
public boolean addItem(T t) {
return itemQ.add(t);
}
}
class IntegerAddItem extends AddItem<Integer> {
public IntegerAddItem(ConcurrentLinkedQueue<Integer> itemQ) {
super(itemQ);
}
AtomicInteger item = new AtomicInteger(0);
@Override
boolean addItem() {
return addItem(item.incrementAndGet());
}
}
class Producer<T> implements Runnable {
private final ConcurrentLinkedQueue<T> itemQueue;
AtomicInteger item = new AtomicInteger(0);
AtomicLong count = new AtomicLong(0);
AddItem<T> addMethod;
final long limit;
public Producer(ConcurrentLinkedQueue<T> itemQ, AddItem<T> addMethod,
long limit) {
itemQueue = itemQ;
this.limit = limit;
this.addMethod = addMethod;
}
public void run() {
while (count.getAndIncrement() < limit) {
addMethod.addItem();
try {
Thread.sleep(new Random().nextInt(5000));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
Thread.currentThread().interrupt();
}
}
}
}