4

私は経験豊富な Java プログラマーではないので、これが初心者の質問である場合はご容赦ください。

大きく 3 つのモジュールで構成されるシステムを設計しています。クライアント、サーバー、およびアプリケーション。アイデアは、クライアントがサーバーにメッセージを送信することです。サーバーは、アプリケーションでユース ケースをトリガーします。ユースケースの結果はサーバーに返され、サーバーはその結果をクライアントに送信します。私がこのアーキテクチャを選択したのは、一度に複数のクライアントをサポートする必要があると予想しているためです。サーバー モジュールを他のアプリケーションで再利用できるようにしたいのです。可能な限りドメイン ロジックを実装し、より高度な Java を学習する機会があるためです。

さまざまなモジュールをキューで結び付けることを計画しています。クライアントは十分に単純です。メッセージを発行し、応答が到着するまでブロックします (単純化しすぎている可能性がありますが、現時点では妥当なモデルです)。アプリケーションも同様に問題ではありません。入力キューでブロックし、有効なメッセージを受信するとユースケースを実行し、結果を出力キューにプッシュします。複数のクライアントを持つことで、物事は少し難しくなりますが、それでも私の経験レベルでは理解できます. サーバーは開いている接続ごとにスレッドを維持し、サーバーのアウトバウンド/アプリケーション インバウンド キューは同期されているため、2 つのメッセージが一度に到着した場合、2 番目のスレッドは最初のスレッドがそのペイロードをキューに配信するまで少し待つ必要があります。

問題は、2 つの独立したものをブロックする必要がある中間のサーバーです。サーバーは、クライアントとアプリケーションの出力キュー (サーバーの入力キューとして機能) の両方を監視しています。サーバーは、メッセージがクライアントから着信する (アプリケーションに転送される) まで、またはアプリケーションがタスクを完了して結果をアプリケーションのアウトバウンド/サーバー インバウンド キューにプッシュするまで、ブロックする必要があります。

私の知る限り、Java がブロックできるのは 1 つのことだけです。

クライアントがメッセージを送信するか、サーバーの受信キューが空でなくなるまで、サーバーをブロックすることは可能ですか?

アップデート:

私はこれに取り組むのに少し時間があり、問題を説明する最小限のものまで問題を切り詰めることができました。トリミングがあっても、従うべきややかさばるコード ダンプがあります。できるだけ分解してみます。

これはサーバーのコードです:

public class Server implements Runnable {

    private int                     listenPort          = 0;
    private ServerSocket            serverSocket        = null;
    private BlockingQueue<Message>  upstreamMessaes     = null;
    private BlockingQueue<Message>  downstreamMessages  = null;
    private Map<Integer, Session>   sessions            = new ConcurrentHashMap ();
    private AtomicInteger           lastId              = new AtomicInteger ();

    /**
     * Start listening for clients to process
     * 
     * @throws IOException 
     */
    @Override
    public void run () {
        int     newSessionId;
        Session newSession;
        Thread  newThread;

        System.out.println (this.getClass () + " running");

        // Client listen loop
        while (true) {
            newSessionId    = this.lastId.incrementAndGet ();
            try {
                newSession  = new Session (this, newSessionId);
                newThread   = new Thread (newSession);
                newThread.setName ("ServerSession_" + newSessionId);
                this.sessions.put (newSessionId, newSession);
                newThread.start ();
            } catch (IOException ex) {
                Logger.getLogger (Server.class.getName ()).log (Level.SEVERE, null, ex);
            }
        }
    }

    /**
     * Accept a connection from a new client
     * 
     * @return The accepted Socket
     * @throws IOException 
     */
    public Socket accept () throws IOException {
        return this.getSocket().accept ();
    }

    /**
     * Delete the specified Session
     * 
     * @param sessionId ID of the Session to remove
     */
    public void deleteSession (int sessionId) {
        this.sessions.remove (sessionId);
    }

    /**
     * Forward an incoming message from the Client to the application
     * 
     * @param msg
     * @return
     * @throws InterruptedException 
     */
    public Server messageFromClient (Message msg) throws InterruptedException {
        this.upstreamMessaes.put (msg);
        return this;
    }

    /**
     * Set the port to listen to
     * 
     * We can only use ports in the range 1024-65535 (ports below 1024 are 
     * reserved for common protocols such as HTTP and ports above 65535 don't
     * exist)
     * 
     * @param listenPort
     * @return Returns itself so further methods can be called
     * @throws IllegalArgumentException 
     */
    public final Server setListenPort (int listenPort) throws IllegalArgumentException {
        if ((listenPort > 1023) && (listenPort <= 65535)) {
            this.listenPort = listenPort;
        } else {
            throw new IllegalArgumentException ("Port number " + listenPort + " not valid");
        }

        return this;
    }

    /**
     * Get the server socket, initialize it if it isn't already started.
     * 
     * @return The object's ServerSocket
     * @throws IOException 
     */
    private ServerSocket getSocket () throws IOException {
        if (null == this.serverSocket) {
            this.serverSocket = new ServerSocket (this.listenPort);
        }

        return this.serverSocket;
    }

    /**
     * Instantiate the server
     * 
     * @param listenPort
     * @throws IllegalArgumentException 
     */
    public Server ( int listenPort, 
                    BlockingQueue<Message> incomingMessages, 
                    BlockingQueue<Message> outgoingMessages) throws IllegalArgumentException {
        this.setListenPort (listenPort);
        this.upstreamMessaes    = incomingMessages;
        this.downstreamMessages = outgoingMessages;
        System.out.println (this.getClass () + " created");
        System.out.println ("Listening on port " + listenPort);
    }
}

次のメソッドはサーバーに属していると思いますが、現在はコメントアウトされています。

    /**
     * Notify a Session of a message for it
     * 
     * @param sessionMessage 
     */
    public void notifySession () throws InterruptedException, IOException {
        Message sessionMessage  = this.downstreamMessages.take ();
        Session targetSession   = this.sessions.get (sessionMessage.getClientID ());
        targetSession.waitForServer (sessionMessage);
    }

これは私のセッションクラスです

public class Session implements Runnable {
    private Socket              clientSocket    = null;
    private OutputStreamWriter  streamWriter    = null;
    private StringBuffer        outputBuffer    = null;
    private Server              server          = null;
    private int                 sessionId       = 0;

    /**
     * Session main loop
     */
    @Override
    public void run () {

        StringBuffer    inputBuffer = new StringBuffer ();
        BufferedReader  inReader;

        try {
            // Connect message
            this.sendMessageToClient ("Hello, you are client " + this.getId ());
            inReader    = new BufferedReader (new InputStreamReader (this.clientSocket.getInputStream (), "UTF8"));

            do {
                // Parse whatever was in the input buffer
                inputBuffer.delete (0, inputBuffer.length ());
                inputBuffer.append (inReader.readLine ());
                System.out.println ("Input message was: " + inputBuffer);
                this.server.messageFromClient (new Message (this.sessionId, inputBuffer.toString ()));
            } while (!"QUIT".equals (inputBuffer.toString ()));

            // Disconnect message
            this.sendMessageToClient ("Goodbye, client " + this.getId ());
        } catch (IOException | InterruptedException e) {
            Logger.getLogger (Session.class.getName ()).log (Level.SEVERE, null, e);
        } finally {
            this.terminate ();
            this.server.deleteSession (this.getId ());
        }
    }

    /**
     * 
     * @param msg
     * @return
     * @throws IOException 
     */
    public Session waitForServer (Message msg) throws IOException {
        // Generate a response for the input
        String output   = this.buildResponse (msg.getPayload ()).toString ();
        System.out.println ("Output message will be: " + output);

        // Output to client
        this.sendMessageToClient (output);

        return this;
    }

    /**
     * 
     * @param request
     * @return 
     */
    private StringBuffer buildResponse (CharSequence request) {
        StringBuffer ob = this.outputBuffer;
        ob.delete (0, this.outputBuffer.length ());

        ob.append ("Server repsonded at ")
          .append (new java.util.Date ().toString () )
          .append (" (You said '" )
          .append (request)
          .append ("')");

        return this.outputBuffer;
    }

    /**
     * Send the given message to the client
     * 
     * @param message
     * @throws IOException 
     */
    private void sendMessageToClient (CharSequence message) throws IOException {
        // Output to client
        OutputStreamWriter osw  = this.getStreamWriter ();
        osw.write ((String) message);
        osw.write ("\r\n");
        osw.flush ();
    }

    /**
     * Get an output stream writer, initialize it if it's not active
     * 
     * @return A configured OutputStreamWriter object
     * @throws IOException 
     */
    private OutputStreamWriter getStreamWriter () throws IOException {
        if (null == this.streamWriter) {
            BufferedOutputStream os = new BufferedOutputStream (this.clientSocket.getOutputStream ());
            this.streamWriter       = new OutputStreamWriter (os, "UTF8");
        }

        return this.streamWriter;
    }

    /**
     * Terminate the client connection
     */
    private void terminate () {
        try {
            this.streamWriter   = null;
            this.clientSocket.close ();
        } catch (IOException e) {
            Logger.getLogger (Session.class.getName ()).log (Level.SEVERE, null, e);
        }
    }

    /**
     * Get this Session's ID
     * 
     * @return The ID of this session
     */
    public int getId () {
        return this.sessionId;
    }

    /**
     * Session constructor
     * 
     * @param owner The Server object that owns this session
     * @param sessionId The unique ID this session will be given
     * @throws IOException 
     */
    public Session (Server owner, int sessionId) throws IOException {
        System.out.println ("Class " + this.getClass () + " created");

        this.server         = owner;
        this.sessionId      = sessionId;
        this.clientSocket   = this.server.accept ();

        System.out.println ("Session ID is " + this.sessionId);
    }
}

テスト アプリケーションはかなり基本的なもので、元の要求メッセージの変更されたバージョンをエコー バックするだけです。実際のアプリケーションは、メッセージを受信し、サーバーに意味のある応答を返します。

public class TestApp implements Runnable {
    private BlockingQueue <Message> inputMessages, outputMessages;

    @Override
    public void run () {
        Message         lastMessage;
        StringBuilder   returnMessage   = new StringBuilder ();

        while (true) {
            try {
                lastMessage = this.inputMessages.take ();

                // Construct a response
                returnMessage.delete (0, returnMessage.length ());
                returnMessage.append ("Server repsonded at ")
                             .append (new java.util.Date ().toString () )
                             .append (" (You said '" )
                             .append (lastMessage.getPayload ())
                             .append ("')");

                // Pretend we're doing some work that takes a while
                Thread.sleep (1000);

                this.outputMessages.put (new Message (lastMessage.getClientID (), lastMessage.toString ()));
            } catch (InterruptedException ex) {
                Logger.getLogger (TestApp.class.getName ()).log (Level.SEVERE, null, ex);
            }
        }
    }

    /**
     * Initialize the application
     * 
     * @param inputMessages Where input messages come from
     * @param outputMessages Where output messages go to
     */
    public TestApp (BlockingQueue<Message> inputMessages, BlockingQueue<Message> outputMessages) {
        this.inputMessages = inputMessages;
        this.outputMessages = outputMessages;
        System.out.println (this.getClass () + " created");
    }
}

Message クラスは非常に単純で、元のクライアント ID とペイロード文字列だけで構成されているため、省略しています。

最後にメインクラスはこんな感じ。

public class Runner {
    /**
     * 
     * @param args The first argument is the port to listen on.
     * @throws Exception  
     */
    public static void main (String[] args) throws Exception {
        BlockingQueue<Message>  clientBuffer    = new LinkedBlockingQueue (); 
        BlockingQueue<Message>  appBuffer       = new LinkedBlockingQueue ();
        TestApp appInstance                     = new TestApp (clientBuffer, appBuffer);
        Server serverInstance                   = new Server (Integer.parseInt (args [0]), clientBuffer, appBuffer);
        Thread appThread                        = new Thread (appInstance);
        Thread serverThread                     = new Thread (serverInstance);

        appThread.setName("Application");
        serverThread.setName ("Server");

        appThread.start ();
        serverThread.start ();

        appThread.join ();
        serverThread.join ();

        System.exit (0);
    }
}

実際のアプリケーションはより複雑になりますが、TestApp は基本的な使用パターンを示しています。そこに何かがあるまで入力キューでブロックし、それを処理してから、結果を出力キューにプッシュします。

セッション クラスは、特定のクライアントとサーバー間のライブ接続を管理します。クライアントから入力を受け取り、それを Message オブジェクトに変換します。また、サーバーから Message オブジェクトを受け取り、それらを出力に変換してクライアントに送信します。

サーバーは、新しい着信接続をリッスンし、着信接続ごとに Session オブジェクトを設定します。セッションがメッセージを渡すと、アプリケーションが処理できるようにアップストリーム キューに入れます。

私が抱えている問題は、TestApp からさまざまなクライアントに戻るメッセージを返すことです。クライアントからのメッセージが着信すると、セッションはメッセージを生成してサーバーに送信します。サーバーはそれをアップストリーム キューに入れます。これは、TestApp の入力キューでもあります。これに応答して、TestApp は応答メッセージを生成し、サーバーのダウンストリーム キューでもある出力キューに入れます。

これは、セッションが 2 つの無関係なイベントを待機する必要があることを意味します。彼らはまでブロックする必要があります

  • 入力がクライアントから到着します (クライアント ソケットの BufferedReader は処理する入力を持っています)。
  • または、サーバーからメッセージが送信されます (サーバーはセッションで WaitForServer () メソッドを呼び出します)。

サーバー自体に関しては、2 つの無関係なイベントも待機する必要があります。

  • Session は、TestApp に渡すメッセージを含む messageFromClient() を呼び出します。
  • または、TestApp がメッセージを出力/ダウンストリーム キューにプッシュして、セッションに渡します。

一見簡単そうに見えた作業が、当初の想像よりもはるかに困難であることがわかりました。私はまだ並行プログラミングにかなり慣れていないので、明らかなことを見落としていると思いますが、どこが間違っているのかを指摘していただけると助かります。

4

2 に答える 2

1

実装ではメソッドを使用してクライアント-セッション-サーバー間でデータを渡しているため、実際には差し迫った問題は既に解決しています。ただし、これは意図したものではない可能性があります。何が起こっているかは次のとおりです。

セッションのrunメソッドは独自のスレッドで実行され、ソケットでブロックされています。サーバーが を呼び出すと、このメソッドはサーバーのスレッドでwaitForServerすぐに実行されます。Java では、スレッドがメソッドを呼び出すと、そのメソッドがそのスレッドで実行されるため、この場合、セッションのブロックを解除する必要はありませんでした。解決しようとしている問題を作成するには、メソッドを削除してキューに置き換える必要があります。その後、サーバーはこのキューにメッセージを配置し、セッションはそれをブロックする必要があり、その結果、セッションは2 つの異なるオブジェクト (ソケットとキュー) でブロックします。waitForServerBlockingQueue messagesFromServer

セッションが 2 つのオブジェクトでブロックする必要がある実装に切り替えると仮定すると、コメントで説明した 2 つのアプローチのハイブリッドでこれを解決できると思います。

各セッションのソケットには、ブロックするためのスレッドが必要です。これを、ソケットをポーリングして数分間スリープする固定スレッド プール (たとえば、4 つのスレッド) に置き換えない限り、これを回避する方法はありません。それらから読み取るものが何もない場合は、数十ミリ秒。

すべてのサーバー -> セッション トラフィックを 1 つのキューとその上でブロックする 1 つのスレッドで管理できます。サーバーはペイロードにセッションの「アドレス」を含めて、ブロックしているスレッドがメッセージの処理方法を認識できるようにします。多くのセッションがあるときにこれがスケーリングしないことがわかった場合は、いつでもスレッド/キューの数を増やすことができます。たとえば、32 セッションの場合、スレッド/キューごとに 4 つのスレッド/キュー、スレッド/キューごとに 8 つのセッションを持つことができます。

于 2013-04-27T14:38:45.747 に答える
0

誤解しているかもしれませんが、メッセージを「リッスン」するコードがある場合は、単純な OR ステートメントを使用してこれを解決できるはずです。

もう 1 つの便利な方法は、すべてのクライアントに一意の ID を追加することです。これにより、メッセージがどのクライアントを対象としているかがわかります。

于 2013-04-26T17:28:17.820 に答える