0

Java のミルティスレッドについて質問があります。ジェネレーター、バッファー、シンクがあるリンクをシミュレートしています。すべては、ソケットを使用してそれらの間で通信する個別のプログラムです。それはうまくいっていますが、バッファに少し問題があります。次の方法で完全優先キューをシミュレートしたいのですが、優先度の高いキューと優先度の低いキューが 2 つあるため、2 つのバッファーがあります。BufferContainer 内にこの 2 つがあり、ボットはそれぞれ別のジェネレーターからデータを受け取るため、スレッドが分離されています。問題は、優先度に応じてどのパケットをシンクに送信するかを決定するスケジューラが 1 つしかないことです。動作しているようですが、プログラムを停止するのに問題があります。バッファ コンテナは、2 つの異なるバッファとスケジューラを生成します。これらは 3 つの異なるスレッドであり、スケジューラーがタスクをいつ終了するかを知る必要があります。しかし、私はそれを行う方法がわかりません。それが私が試したことです:

    import java.io.BufferedWriter;
    import java.io.FileWriter;
    import java.io.IOException;


    public class BufferContainer {

    private Buffer buffer0;
    private Buffer buffer1;
    private Scheduler scheduler;
    private Queue[] queue;
    private Queue q1;

    public BufferContainer (int inPort0, int outPort, long queueSize0, double serviceRate0, 
            int inPort1, long queueSize1, double serviceRate1){
        try {
            /*Some code here*/

            //Scheduler separated thread
            this.scheduler = new Scheduler(this.queue, serviceRate0, 
                   serviceRate1, outPort, outLog0, outLog1);
            //start the scheduler
            this.scheduler.start();

            //Buffers separated threads
            this.buffer0 = new Buffer(inPort0, queueSize0, this.queue[0], outLog0); 
            this.buffer1 = new Buffer(inPort1, queueSize1, this.queue[1], outLog1); 
            //start the buffers
            this.buffer0.start();
            this.buffer1.start();

            /*Some code here*/

            while (true){
                if(this.scheduler.schedulerTerminated()){
                        //Some code here
                        this.scheduler.stop();
                        break;
                    }
                    /*Some code here*/
                }
            }
            //stop Buffers
            this.buffer0.stop();
            this.buffer1.stop();

            /*Some code here*/
        } catch (IOException ex) {
        }
    }

    public static void main(String[] args){
           BufferContainer buf = new BufferContainer(Integer.parseInt(args[0]), Integer.parseInt(args[1]),
                Long.parseLong(args[2]), Double.parseDouble(args[3]), Integer.parseInt(args[4]), 
                Long.parseLong(args[5]), Double.parseDouble(args[6]));
    }
}

スケジューラーがいつ終了するかを知る必要があるだけで、スケジューラー内にブール値があり、それが発生したときに true に設定されます。外部で while(true) を使用して、この変数を継続的にチェックし、必要に応じてスケジューラーを停止しようとしますが、動作していません。スケジューラから buffercontainer に通知する方法についてのアイデアはありますか?

アップデート

揮発性ソリューションをテストしましたが、機能しません。私が思うに、スケジューラーの実行中に変数をチェックできない可能性があります。これは私のスケジューラがどのように見えるかです:

import java.io.BufferedWriter;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.util.Random;

public class Scheduler implements Runnable{

    Thread th;
    private double serviceRateHigh, serviceRateLow;
    private DataOutputStream wr;
    private BufferedWriter log0, log1;
    private Socket socket;
    private String packet;
    private Queue highPrioQueue, lowPrioQueue;
    //Es possible que es mdifiqui per diferents threads
    private volatile boolean shouldStop, schedulerTerminated, buffer0Terminated, buffer1Terminated;

    public Scheduler(Queue[] queue, double serviceRateHigh, double serviceRateLow, 
            int port, BufferedWriter log0, BufferedWriter log1){

        //Creem el thread
        this.th = new Thread(this);
        //Guardem les dues cues
        this.setQueues(queue);
        this.log0 = log0;
        this.log1 = log1;
        //Guardem els dos service rate
        this.serviceRateHigh = serviceRateHigh; 
        this.serviceRateLow = serviceRateLow;
        this.shouldStop = false;
        this.schedulerTerminated = false;
        this.buffer0Terminated = false;
        this.buffer1Terminated = false;
        //Socket y buffer de escriptura
        try {
            this.socket = new Socket(InetAddress.getByName("127.0.0.1"), port);
            this.wr = new DataOutputStream(this.socket.getOutputStream());
        } catch (IOException e) {
        }
    }

    //Iniciar el thread
    public void start(){
        this.th.start();
    }

    //Parar el thread
    public void stop(){
        this.shouldStop = true;
    }
    //When both buffer have notified we can terminate the scheduler
    public synchronized boolean schedulerTerminated(){
        if (this.buffer0Terminated && this.buffer1Terminated)
            this.schedulerTerminated = true;

        return this.schedulerTerminated;
    }

    public synchronized void setBufferTerminated(int n){
        if(n == 0) this.buffer0Terminated = true;
        if(n == 1) this.buffer1Terminated = true;
    }

    //Assigna les cues de alta i baixa prioritat
    private void setQueues(Queue[] queue){
        char prio;
        for (int i = 0; i < queue.length; i++){
            prio = queue[i].getPrio();
            if (prio == 'H')
                this.highPrioQueue = queue[i];
            else{ 
                if (prio == 'L')
                    this.lowPrioQueue = queue[i];
            }      
        }
    }

    //Notifica al thread per a que es desperti
    synchronized public void resume(){
        notify();
    }

    protected void sendLastPacketNumberToSink(int pl0, int pl1) {           
        String datagram = "BYE" + " " + pl0 + " " + pl1;
            try {
                wr.writeBytes(datagram + '\n');
            }catch (IOException e) {}
    }

    //Proces principal del thread
    @Override
    public void run(){
        while(!shouldStop){
            //Creem un generador de nombres aleatoris
            Random ran = new Random();
            //generem un nombre aleatori entre 0 i 1
            double uniform = ran.nextDouble();

            if(!this.highPrioQueue.isEmpty()){ //Cua de alta prioritat NO buida   
                //convertim a exponencial
                double sleeptime = Math.log(1-uniform)/(-this.serviceRateHigh);
                try{
                    this.th.sleep(Math.round(sleeptime*1000));
                }catch(InterruptedException e){
                }
                    //Enviem el paquet de la cua de alta prioritat
                try {
                    if ((this.packet = this.highPrioQueue.nextPacket()) != null){
                            this.wr.writeBytes(this.packet + '\n');
                            log0.write("\nScheduler: Paquet [" + packet + "] enviat desde la cua high\n");
                            log1.write("\nScheduler: Paquet [" + packet + "] enviat desde la cua high\n");
                        } 
                }catch (IOException e) {
                }
            }
            else { 
                if (!this.lowPrioQueue.isEmpty()){//Cua de alta prioritat buida
                    //convertim a exponencial
                    double sleeptime = Math.log(1-uniform)/(-this.serviceRateLow);
                    try{
                        this.th.sleep(Math.round(sleeptime*1000));
                    }catch(InterruptedException e){
                    }
                    //Enviem el paquet de la cua de baixa prioritat
                    try {
                        if ((this.packet = this.lowPrioQueue.nextPacket()) != null){
                                this.wr.writeBytes(this.packet + '\n');
                                log0.write("\nScheduler: Paquet [" + packet + "] enviat desde la cua Low\n");
                                log1.write("\nScheduler: Paquet [" + packet + "] enviat desde la cua Low\n");
                        }
                    }catch (IOException e) {
                    }
                }
            }
        }   
    }
}

run メソッドが現在実行されている間に、スケジューラでメソッドを呼び出すことができるかどうかはわかりません。その場合、エラーが発生します。

ありがとうございました!

4

0 に答える 0