3

私は次のようにObserver&Runnableを実装するクラスを持っています(この例は不器用かもしれないことを知っています):

public class Triage implements Observer,Runnable{
    Observable obsrvbl;
    private BlockingQueue<String> messages; 
    volatile static boolean interrupted=false;  
    double updated;

    Triage(Observable obsrvbl, BlockingQueue messages){
    this.obsrvbl=obsrvbl;
    this.messages = messages;
    obsrvbl.addObserver(this);
    }

    public void update(Observable o, Object arg){
        updated += ((Double)arg).doubleValue();
        System.out.println("updated");
    }

    public void run(){
        String msg; 
        while(!interrupted){
            msg=messages.take();
            if(msg!=null){    
                //do something with message         
            }
        }
    }
}

ピークされているキューは、ObservableがnotifyObservers()を呼び出すと同時に入力されます。キューに何もない場合、update()はオブザーバーで正常に呼び出されますが、キューに処理するメッセージがある場合、update()は呼び出されません。これは予想される動作ですか?

私はこれを見ましたが、それは別の問題のようです。

そして、これがObservableです-やや工夫されています:

public class Producer extends Observable implements Runnable {
    volatile static boolean interrupted=false;
    private BlockingQueue<String> quotes;
    Producer(BlockingQueue quotes){
        this.quotes=quotes; 
    }
    public void run(){
        String msg;     
        while(!interrupted){    
            msg=quotes.take();
            if(msg!=null){
                setChanged();
                notifyObservers(Double.valueOf(3.0));   
            }
        }
    }
}
4

1 に答える 1

1

これの何が問題だったのかを突き止めました-私の側の見落とし。@slim がコメントで示唆したように、2 つのキューは同じであったため、Triage の run() は Producer の run() の前にメッセージを消費していたため、通知されませんでした。とにかく fwiw - ここに完全な動作例があります:

public class Triage implements Observer,Runnable{
    Observable obsrvbl;
    private BlockingQueue<String> messages; 
    volatile static boolean interrupted=false;  
    Integer updated = 0;
    private static Random rand = new Random(47);

    Triage(Observable obsrvbl, BlockingQueue messages){
        this.obsrvbl=obsrvbl;
        this.messages = messages;
        obsrvbl.addObserver(this);
    }

    public void update(Observable o, Object arg){
        updated += ((Integer)arg);
        System.out.println("Updated: " + updated);
    }

    public void run(){
        String msg; 
        while(!interrupted){
            try{
                msg=messages.take();
                System.out.println("Run: " + msg);
            }catch(InterruptedException ie){}
        }
    }

    public static void main(String[] args){
        BlockingQueue<String> q1 = new LinkedBlockingQueue<String>();
        BlockingQueue<String> q2 = new LinkedBlockingQueue<String>();
        Producer p = new Producer(q1);
        new Thread(p).start();
        new Thread(new Triage(p,q2)).start();
        for(int i=0;i<20;i++){      
            int next = rand.nextInt(10)*500;
            System.out.println("Populating: " + next);
            q1.add((Integer.valueOf(next)).toString());
            q2.add((Integer.valueOf(next)).toString());         
        }
    }
}

class Producer extends Observable implements Runnable {
    volatile static boolean interrupted=false;
    private BlockingQueue<String> quotes;
    Producer(BlockingQueue quotes){
        this.quotes=quotes; 
    }
    public void run(){
        String msg;     
        while(!interrupted){    
            try{
                msg=quotes.take();
                if(msg!=null){
                    System.out.println("Notifying: " + msg);
                    setChanged();
                    notifyObservers(Integer.valueOf(msg));  
                }
            }catch(InterruptedException ie){}
        }
    }
}
于 2012-09-27T10:56:17.110 に答える