0

I am trying to implement Bully Algorithm in Java using threads.
Here is the code which I have written.

package newbully;

public class NewBully {

    public static void main(String[] args) {
        int total_processes = 4;
        Thread1[] t = new Thread1[total_processes];
        for (int i = 0; i < total_processes; i++) {
            t[i] = new Thread1(new Process(i+1, i+1), total_processes);
        }
        try {
            Election.initialElection(t);
        } catch (Exception e) {
            System.out.println("Possibly you are using null references in array");
        }
        for (int i = 0; i < total_processes; i++) {
            new Thread(t[i]).start();
        }
    }
}

package newbully;

public class Election {

    private static boolean pingFlag = false;
    private static boolean electionFlag = false;
    private static boolean messageFlag = false;

    public static boolean isMessageFlag() {
        return messageFlag;
    }

    public static void setMessageFlag(boolean messageFlag) {
        Election.messageFlag = messageFlag;
    }

    public static boolean isPingFlag() {
        return pingFlag;
    }

    public static void setPingFlag(boolean pingFlag) {
        Election.pingFlag = pingFlag;
    }

    public static boolean isElectionFlag() {
        return electionFlag;
    }

    public static void setElectionFlag(boolean electionFlag) {
        Election.electionFlag = electionFlag;
    }

    public static void initialElection(Thread1[] t) {
        Process temp = new Process(-1, -1);
        for (int i = 0; i < t.length; i++) {
            if (temp.getPriority() < t[i].getProcess().getPriority()) {
                temp = t[i].getProcess();
            }
        }
        t[temp.pid - 1].getProcess().CoOrdinatorFlag = true;
    }
}

package newbully;

public class Process {

    int pid;
    boolean downflag,CoOrdinatorFlag;

    public boolean isCoOrdinatorFlag() {
        return CoOrdinatorFlag;
    }

    public void setCoOrdinatorFlag(boolean isCoOrdinator) {
        this.CoOrdinatorFlag = isCoOrdinator;
    }
    int priority;

    public boolean isDownflag() {
        return downflag;
    }

    public void setDownflag(boolean downflag) {
        this.downflag = downflag;
    }

    public int getPid() {
        return pid;
    }

    public void setPid(int pid) {
        this.pid = pid;
    }

    public int getPriority() {
        return priority;
    }

    public void setPriority(int priority) {
        this.priority = priority;
    }

    public Process() {
    }

    public Process(int pid, int priority) {
        this.pid = pid;
        this.downflag = false;
        this.priority = priority;
        this.CoOrdinatorFlag = false;
    }
}

package newbully;

import java.util.*;
import java.io.*;
import java.net.*;

public class Thread1 implements Runnable {

    private Process process;
    private int total_processes;
    ServerSocket[] sock;
    Random r;

    public Process getProcess() {
        return process;
    }

    public void setProcess(Process process) {
        this.process = process;
    }

    public Thread1(Process process, int total_processes) {
        this.process = process;
        this.total_processes = total_processes;
        this.r = new Random();
        this.sock = new ServerSocket[total_processes];
    }

    private void recovery() {
    }

    synchronized private void pingCoOrdinator() {
        try {
            if (Election.isPingFlag()) {
                wait();
            }
            if (!Election.isElectionFlag()) {
                Election.setPingFlag(true);
                System.out.println("Process[" + this.process.getPid() + "]: Are you alive?");
                Socket outgoing = new Socket(InetAddress.getLocalHost(), 12345);
                outgoing.close();
                Election.setPingFlag(false);
                notifyAll();
            }
        } catch (Exception ex) {
            //Initiate Election
            System.out.println("process[" + this.process.getPid() + "]: -> Co-Ordinator is down\nInitiating Election");
            Election.setElectionFlag(true);
            Election.setPingFlag(false);
            notifyAll();
        }
    }

    synchronized private void executeJob() {
        int temp = r.nextInt(20);
        for (int i = 0; i <= temp; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                System.out.println("Error Executing Thread:" + process.getPid());
                System.out.println(e.getMessage());
            }
        }
    }

    synchronized private boolean sendMessage() {
        boolean response = false;
        int i = 0;
        try {
            if (Election.isMessageFlag()) {
                wait();
            }
            Election.setMessageFlag(true);

            for (i = this.process.getPid() + 1; i <= this.total_processes; i++) {
                try {
                    Socket electionMessage = new Socket(InetAddress.getLocalHost(), 10000 + i);
                    System.out.println("Process[" + this.process.getPid() + "] -> Process[" + i + "]  responded to election message successfully");
                    electionMessage.close();
                    response = true;
                } catch (Exception ex) {
                    System.out.println("Process[" + this.process.getPid() + "] -> Process[" + i + "] did not respond to election message");
                }
            }
            Election.setMessageFlag(false);
            notifyAll();
        } catch (Exception ex1) {
            System.out.println(ex1.getMessage());
        }

        return response;
    }

    synchronized private void serve() {
        try {
            //service counter
            ServerSocket s = new ServerSocket(12345);
            for (int counter = 0; counter <= 10; counter++) {
                Socket incoming = s.accept();
                System.out.println("Process[" + this.process.getPid() + "]:Yes");
                Scanner scan = new Scanner(incoming.getInputStream());
                PrintWriter out = new PrintWriter(incoming.getOutputStream(), true);
                if (scan.hasNextLine()) {
                    if (scan.nextLine().equals("Who is the co-ordinator?")) {
                        System.out.print("Process[" + this.process.getPid() + "]:");
                        out.println(this.process);
                    }
                }
                if (counter == 10) {//after serving 10 requests go down
                    this.process.setCoOrdinatorFlag(false);
                    this.process.setDownflag(true);
                    try {
                        incoming.close();
                        s.close();
                        sock[this.process.getPid() - 1].close();
                        Thread.sleep((this.r.nextInt(10) + 1) * 50000);//going down
                        recovery();
                    } catch (InterruptedException e) {
                        System.out.println(e.getMessage());
                    }
                }
            }
        } catch (IOException ex) {
            System.out.println(ex.getMessage());
        }
    }

    @Override
    public void run() {
        try {
            sock[this.process.getPid() - 1] = new ServerSocket(10000 + this.process.getPid());
        } catch (IOException ex) {
            System.out.println(ex.getMessage());
        }
        while (true) {
            if (process.isCoOrdinatorFlag()) {
                //serve other processes
                serve();
            } else {
                while (true) {
                    //Execute some task
                    executeJob();

                    //Ping the co-ordinator
                    pingCoOrdinator();

                    if (Election.isElectionFlag()) {
                        if (!sendMessage()) {//elect self as co-ordinator
                            System.out.println("New Co-Ordinator: Process[" + this.process.getPid() + "]");
                            this.process.setCoOrdinatorFlag(true);
                            Election.setElectionFlag(false);
                            break;
                        }
                    }
                }
            }
        }
    }
}

When I am trying to execute the code out of the 4 threads which I have created some threads are waiting premanently using wait() call. They are not being notified by notifyAll(). Can anyone suggest why this is happening?

4

2 に答える 2

7

Each thread is calling wait() on itself (on its own Thread1 instance). That means that when you call notifyAll() on that same Thread1 instance, only the single Thread1 that is waiting it will be notified, and not all the other threads.

What you have to do is make all your Thread1 objects call wait() on a single, common object, and also call notifyAll() on that same object.

Ofcourse you have to synchronize on the common object when you call wait() or notifyAll() on it; if you don't do that, you'll get an IllegalMonitorStateException.

// Object to be used as a lock; pass this to all Thread1 instances
Object lock = new Object();

// Somewhere else in your code
synchronized (lock) {
    lock.wait();
}

// Where you want to notify
synchronized (lock) {
    lock.notifyAll();
}
于 2012-04-18T12:28:19.200 に答える
3

Both notify() (or notifyAll()) and wait() must be written into synchronized block on the same monitor.

For example:

synchronized(myLock) {
    wait();
}

..................

synchronized(myLock) {
    notifyAll();
}
于 2012-04-18T12:22:53.227 に答える