0

同期を適切に使用していないと思います。以下の出力が得られます。

BlockingQueue または Java 5 の同時実行機能を使用しないことを意識的に選択しました。これを書いたのは、同期といくつかの基本を学べるようにするためです。

プロデューサー スレッド: PRODUCER-1 アイテム 0-Name-0 をキューに追加
コンシューマー スレッド CONSUMER-2 アイテムを処理: 0-Name-0
プロデューサー スレッド: PRODUCER-2 アイテム 1-Name-1 をキューに追加

どこが間違っているのか理解してもらえますか?

public class ProducerConsumerManager {

public static void main(String args[]){

    ItemQueue itemQueue = new ItemQueue();

    Producer producer1 = new Producer(itemQueue,15, 500);
    Producer producer2 = new Producer(itemQueue,15, 1000);
    Consumer consumer1 = new Consumer(itemQueue,500);
    Consumer consumer2 = new Consumer(itemQueue,1500);

    Thread producerThread1 = new Thread(producer1,"PRODUCER-1");
    Thread producerThread2 = new Thread(producer2,"PRODUCER-2");
    Thread consumerThread1 = new Thread(consumer1,"CONSUMER-1");
    Thread consumerThread2 = new Thread(consumer2,"CONSUMER-2");

    producerThread1.start();
    producerThread2.start();

    consumerThread1.start();
    consumerThread2.start();


    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        System.out.println("The MAIN THREAD has been INTERRUPTED");
    }


}
}


 public class Consumer implements Runnable{

private ItemQueue itemQueue;
private int waitTimeInMillis;
public Consumer(ItemQueue queue, int waitTimeInMillis){
    itemQueue = queue;
    this.waitTimeInMillis = waitTimeInMillis;
}

private boolean processItem(Item item){     
    if(item == null){
        System.out.println("Consumer Thread cannot process as Item is null");
        return false;
    }               
    return true;
}

public void run() {
    synchronized(itemQueue){
        try {
        if(itemQueue.hasMoreItems()){
            Item item = itemQueue.getNextItem();
            System.out.println("Consumer Thread "+ Thread.currentThread().getName() + " processing item: " +  item.getItemNo() + "-" + item.getItemName());

            processItem(item);              
                Thread.sleep(waitTimeInMillis);

        }else{

                itemQueue.wait();
            }} catch (InterruptedException e) {
                System.out.println("Consumer Thread INTERRUPTED");                  
            }

    }               
}

}


  public class Producer implements Runnable{

private ItemQueue itemQueue;
private int maxCount;
private int waitTimeInMillis;
public Producer(ItemQueue queue, int maxItems, int waitTimeInMillis){
    itemQueue = queue;  
    this.maxCount = maxItems;
    this.waitTimeInMillis = waitTimeInMillis;
}

public void run() { 
    synchronized(itemQueue){
        try {
        if(itemQueue.queueCount()>=maxCount){

                itemQueue.wait();               
        }
        else{
            produceNewItem();
            Thread.sleep(waitTimeInMillis);
        }
        } catch (InterruptedException e) {
            System.out.println("Producer Thread INTERRUPTED");
        }
    }       
}

private boolean produceNewItem(){
    Item  item = null;
    synchronized(ItemService.class){
        item = ItemService.getNextItem();       
    System.out.println("Producer Thread: " + Thread.currentThread().getName() + " adding item " + item.getItemNo() +"-"+item.getItemName()+"  to queue");
    itemQueue.addItem(item);
    return true;
}
}
}


  import java.util.LinkedList;

  public class ItemQueue {

private LinkedList<Item> itemList = new LinkedList<Item>();

public void addItem(Item item){
    itemList.add(item);
}

public Item getNextItem(){
    return itemList.poll();
}

public boolean hasMoreItems(){
    return  !itemList.isEmpty();
}

public int queueCount(){
    return itemList.size();
}
}


   public class Item {

private String itemName;
private int itemNo;
private String itemDescription;

public String getItemName() {
    return itemName;
}
public void setItemName(String itemName) {
    this.itemName = itemName;
}
public int getItemNo() {
    return itemNo;
}
public void setItemNo(int itemNo) {
    this.itemNo = itemNo;
}
public String getItemDescription() {
    return itemDescription;
}
public void setItemDescription(String itemDescription) {
    this.itemDescription = itemDescription;
}

public Item (int no, String name, String desc){
    itemName = name;
    itemNo = no;
    itemDescription = desc;
}
}


   import java.util.LinkedList;

  public class ItemService {

static LinkedList<Item> itemList = new LinkedList<Item>();
static int counter =0;

static{
    Item item = null;
    for(int i=0;i<10000;i++){
        item = new Item(i, "Name-"+i, "Description for item " + i);
        itemList.add(item);
    }

}

public static Item getNextItem(){
    if(counter < 9999){
        Item item= itemList.get(counter);
        counter++;
        return item;
    }
    else
    {
        System.out.println("Cannot PRODUCE any further items. all exhausted");
        return null;
    }

}

}
4

2 に答える 2

1

どの問題があるか、つまり、得られる出力ではなく何を期待するかについてはまだ述べていませんが、コードには 2 つの重要な問題があります。

  1. どちらの種類のスレッドも itemQueue で待機します (満杯のため、または空のため) が、コード内のどこにもnotify()notifyAll()待機中のスレッドをウェイクアップするために呼び出される場所はありません。これは必然的に飢餓につながります。プロデューサーがアイテムをキューに入れるとき、notifyAll()待っているコンシューマーを起こすために呼び出す必要があります。コンシューマーがキューからアイテムを削除するとき、notifyAll()待機中のプロデューサーをウェイクアップするために呼び出す必要があります。
  2. wait() メソッドは、スレッドが起動されたときにスレッドが本当に続行できるかどうかを確認するループ内で常に作成する必要があります。の javadoc を読んでくださいObject.wait()

それほど重要ではないもう 1 つの問題は、各スレッドに同期とwait()/を強制的に実装するのではなくnotifyAll()、これをすべてキュー内にカプセル化する必要があることです。スレッドはアイテムを取得してキューに入れるだけで、可能な限りキューによってブロックされます。つまり、BlockingQueue を再実装する必要があります。

于 2012-06-23T17:15:17.893 に答える
0

あなたが間違っている点の 1 つ (問題の理由ではないかもしれません) は、プロデューサー/コンシューマー モデルでは、プロデューサーの前にコンシューマーを開始/実行する必要があることです。

于 2012-06-25T14:50:51.590 に答える