2

私は2つのモジュールを持つクライアントサーバーアプリケーションに取り組んでいます:1)。レジストリ: ネットワーク内のすべてのサーバーは、リッスンしている IP とポートをレジストリに登録します。2)。メッセージング ノード: これらはサーバーでもあり、レジストリからコマンドを受け取ると、一連のメッセージで相互に通信します。メッセージング ノードは一度に 1 つのノードにのみメッセージを送信しますが、複数のノードからメッセージを受信する場合があります。

レジストリは、利用可能な (登録された) ノードのリストを他のすべてのサーバーに送信します。このリストを受信すると、ノードは互いにメッセージの交換を開始します。

レジストリ部分を機能させることができ、ノードは自分自身を登録できます。また、ノードはレジストリからノード リストを正常に受信し、メッセージの交換を開始できます。ただし、メッセージング ノードは、送信されるすべてのメッセージを受信することはできません。

メッセージング ノードの部分は次のとおりです。

public class MessagingNode  {

    // Constructor
    public MessagingNode(String registryHost, int registryPort)  {
        try  {
            registryHostname = InetAddress.getByName(registryHost).getHostAddress();
        } catch (Exception e)  {
            System.out.println("Can't resolve Registry hostname!");
        }
        registryPortNumber = registryPort;

        // Start the node server
        try  {
            nodeServerSocket = new ServerSocket(0);
        }
        catch (Exception e)  {
            System.out.println("Can't start node server!");
        }

        // Store the MessagingNode server port and IP address
        try  {
            nodeIpAddress = InetAddress.getLocalHost().getHostAddress();
        } catch(Exception e)  {
            System.out.println("Can't get localhost");
        }
        nodePortNumber = nodeServerSocket.getLocalPort();
        //System.out.println("Node Port:"+nodePortNumber);

        // Initialize trackers
        sendTracker = 0;
        receiveTracker = 0;
        sendSummation = 0;
        receiveSummation = 0;
        sending = false;
    }



    // Start the NodeServer to listen for other nodes
    public void startNodeServer()  {
        if(!isRunning)  {
            isRunning = true;
            consoleListener();
            while(MessagingNode.this.isRunning)  {
                Socket clientSocket = null;
                try  {
                    clientSocket = nodeServerSocket.accept();
                    openClient(clientSocket);                       

                } 
                catch(IOException e)  {
                    e.printStackTrace();
                }

            }
        }
    }


    // Messaging nodes receiving 
    public void openClient(final Socket socket)  {
        Thread clientThread = new Thread()  {
            public void run()  {
                int count = 0;
                try  { 
                    byte[] messagePayload = new byte[64];

                    InputStream in = socket.getInputStream();
                    DataInputStream din = new DataInputStream(in);
                    //BufferedReader br =  new BufferedReader(new InputStreamReader(in));   
                    while(din.read(messagePayload) > -1)  {

                        //count++;

                        int type = InterpretMessage.getMessageType(messagePayload);
                        if(type == MessageTypes.MESSAGING_NODES_LIST)  {
                            trimNodeList(messagePayload);
                            System.out.println("Node list Received!");
                            dataThread();

                        } 
                        else  {         
                            handleIncomingPayload(messagePayload);
                        }
                    }
                    //System.out.println("Input connection closed after :" + count);
                //System.out.println("Listener Closed!");
                //in.close();
                //socket.close();

                }
                catch(Exception e)  {
                    e.printStackTrace();
                }


            }
        };
        clientThread.start();
        try  {
                    clientThread.join();
                }
                catch(Exception e)  {
                    e.printStackTrace();
                }


    }

    public void dataThread()    {
        Thread t = new Thread()  {
            public void run()  {
                sendData();
            }
        };
        t.start();
        try  {
            t.join();
        }
        catch(Exception e)  {
            e.printStackTrace();
        }
    }

    public void sendData()  {
        // pick a node
        int size = messagingNodesList.size();
        int index = (int)(Math.random() * size);
        //System.out.println("Node being contacted:" + messagingNodesList.get(index));
        String details = messagingNodesList.get(index);
        String[] str = details.split(" ");


        try  {
            Socket socket = new Socket(str[0], Integer.parseInt(str[1]));
            DataOutputStream out = new DataOutputStream(socket.getOutputStream());
            for(int i= 0; i < 5; i++)  {
                CommMessage outgoing = new CommMessage();
                byte[] msg = outgoing.marshall();
                out.write(msg, 0, msg.length);
                System.out.println("Sending number:" + outgoing.number);
                out.flush();
            }
            out.close();
            //closeConnection = true;
        }
        catch(IOException e)  {
            e.printStackTrace();
        }
    }

    // Remove Node's information
    public void trimNodeList(byte[] incoming)  {
    // Removes the current node's information from the list recieved from registry
    }

    // Update trackers
    public void handleIncomingPayload(byte[] payload)  {
        CommMessage msg = new CommMessage(payload);
        msg.unmarshall();
        System.out.println("Receiving Number:"+msg.number);
        //synchronized(this)  {
            receiveSummation += msg.number;
            receiveTracker += 1;
        //}
    }


    // Console Listener
    public void consoleListener()  {
        Thread listener = new Thread()  {
            public void run()  {
                // listen for keyboard instructions
            }
        };
        listener.start();
    }



    public static void main(String[] args)  {
        MessagingNode node = new MessagingNode(args[0], Integer.parseInt(args[1]));
        node.connectRegistry();
        node.startNodeServer();
        //node.deregisterNode();
        //node.deregisterNode();
        //node.sendBurst();
        // Launch the console Listener Thread
        // Initiate rounds of messaging

    }
}

問題は、私が想定している受信部分のどこかにあります。誰かが同時実行の問題である可能性があると指摘したため、 sendData メソッドで同期を使用してみましたが、役に立ちませんでした。まだすべてのメッセージを受信できていません。どんな洞察も本当に役に立ちます。

ありがとう。

4

0 に答える 0