0

サーバーが間違ったユーザーにメッセージを送信することがあるため、アプリを同期したいと思います。同期ブロックを使用してキューを同期していますが、ソリューションが機能しません。ユーザーが自分宛てではないメッセージを受信することがあります。

コード(server.java)は次のとおりです。(InWorker-ユーザーからのメッセージの受信、OutWorker-ユーザーへのメッセージの送信)すべてのユーザーには独自のクラス(スレッド)があります- MiniServer(2つのスレッドを含む:InWorkerOutWorker)。

    class InWorker implements Runnable{

 String slowo=null;
 ObjectOutputStream oos;
 ObjectInputStream ois;
 ConcurrentMap<String,LinkedBlockingQueue<Message>> map=new ConcurrentHashMap<String, LinkedBlockingQueue<Message>>();
 Message message=null;

InWorker(ObjectInputStream ois,ConcurrentMap<String,LinkedBlockingQueue<Message>> map) {
    this.ois=ois;
    this.map=map;
}

public void run() {

    while(true) {
            //synchronized(queue) {
        try {
            message = (Message) ois.readObject();
            slowo=message.msg;
            if(slowo!=null && !slowo.equals("Bye")) {
                        if(!map.containsKey(message.id)) {
                            map.putIfAbsent(message.id, new LinkedBlockingQueue<Message>());
                        try {
                            map.get(message.id).put(message);
                        } catch (InterruptedException ex) {
                            Logger.getLogger(Communicator.class.getName()).log(Level.SEVERE, null, ex);
                        }
                        }
                        else
                        {
                        try {
                            map.get(message.id).put(message);
                        } catch (InterruptedException ex) {
                            Logger.getLogger(Communicator.class.getName()).log(Level.SEVERE, null, ex);
                        }
                        }
                        }

        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
        e.printStackTrace();
        }
            //}
        Thread.yield();
        } 
}
}

class OutWorker implements Runnable{

String tekst=null;
ObjectOutputStream oos=null;
String id;
Message message;
ConcurrentMap<String,LinkedBlockingQueue<Message>> map=new ConcurrentHashMap<String, LinkedBlockingQueue<Message>>();

OutWorker(ObjectOutputStream oos,String id,ConcurrentMap<String,LinkedBlockingQueue<Message>> map) {
    this.oos=oos;
    this.id=id;
    this.map=map;
}

public void run() {
    while(true) {
            //synchronized(queue) {
                if(map.containsKey(id)) {
                while(!map.get(id).isEmpty()) {
                        try {
                            message=map.get(id).take();
                        } catch (InterruptedException ex) {
                            Logger.getLogger(OutWorker.class.getName()).log(Level.SEVERE, null, ex);
                        }
                        try {
                            oos.writeObject(message);
                            oos.flush();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                }

                }
            //}
        Thread.yield();
}}}

MiniServerおよびServerクラスは次のとおりです。

class MiniSerwer implements Runnable{

    Socket socket=null;
    ExecutorService exec=Executors.newCachedThreadPool();
    ObjectOutputStream oos=null;
    ObjectInputStream ois=null;
    String id;
    Queue<Message> queue=new LinkedList<Message>();

    MiniSerwer(ObjectOutputStream oos,ObjectInputStream ois,String id,Queue<Message> queue) {
        this.oos=oos;
                this.ois=ois;
        this.id=id;
        this.queue=queue;
    }

    public void run() {
            exec.execute(new InWorker(ois,queue)); // input stream
            exec.execute(new OutWorker(oos,id,queue)); //output stream
            Thread.yield();
    }
}

public class Serwer implements Runnable{

ServerSocket serversocket=null;
ExecutorService exec= Executors.newCachedThreadPool();
int port;
String id=null;
Queue<Message> queue=new LinkedList<Message>();
BufferedReader odczyt=null;

ObjectInputStream ois=null;
Message message=null;
ObjectOutputStream oos=null;

Serwer(int port) {
    this.port=port;
}

public void run() {
    try {
        serversocket=new ServerSocket(port);
        while(true) {
            Socket socket=null;
            try {
                socket = serversocket.accept();                                
                                /* first message is login*/
                                oos=new ObjectOutputStream(socket.getOutputStream());
                                oos.flush();
                                ois=new ObjectInputStream(socket.getInputStream());
                                message = (Message) ois.readObject();
                                id=message.sender;
                                System.out.println(id+" log in to the server");

                                exec.execute(new MiniSerwer(oos,ois,id,queue)); // create new thread
            } catch (IOException e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }
                        catch (ClassNotFoundException e) {
                            e.printStackTrace();
                        }    
        }
    } catch (IOException e) {
        e.printStackTrace();
}
}

public static void main(String[] args) {
    int port;
        port=8821;
    ExecutorService exec=Executors.newCachedThreadPool();
    exec.execute(new Serwer(port));
}

誰かが私を助けることができますか?

編集:キューをConcurrentHashMapに変更しましたが、メッセージが間違ったユーザーに送信されることがあります。なんで ?

4

1 に答える 1

5

これは、古典的な生産者/消費者シナリオです。同期されたブロックを破棄し、BlockingQueue(InWorker呼び出しput()とOutWorker呼び出しtake())を使用します。

また、Serverクラスでは、すべての接続で同じキューを共有するのではなく、接続ごとに新しいキューを作成する必要があります。

于 2013-02-20T17:09:20.067 に答える