5

学術的な演習として構築しているマルチスレッド サーバーで問題が発生しています。具体的には、接続を正常に終了することに関してです。

各接続は、Session クラスによって管理されます。このクラスは、接続用に DownstreamThread と UpstreamThread の 2 つのスレッドを維持します。

UpstreamThread はクライアント ソケットでブロックし、すべての着信文字列をメッセージにエンコードして、別のレイヤーに渡して処理します。クライアントへのメッセージが挿入される BlockingQueue の DownstreamThread ブロック。キューにメッセージがある場合、ダウンストリーム スレッドはメッセージをキューから取り出し、文字列に変換してクライアントに送信します。最終的なシステムでは、アプリケーション層が受信メッセージに作用し、送信メッセージをサーバーにプッシュして適切なクライアントに送信しますが、今のところ、受信メッセージをエコーバックする前に 1 秒間スリープする単純なアプリケーションがあります。タイムスタンプが追加された送信メッセージとして。

私が抱えている問題は、クライアントが切断されたときにすべてを正常にシャットダウンすることです。私が取り組んでいる最初の問題は、クライアントが QUIT コマンドで接続を終了していることをサーバーに知らせる通常の切断です。基本的な疑似コードは次のとおりです。

while (!quitting) {
    inputString = socket.readLine () // blocks
    if (inputString != "QUIT") {
        // forward the message upstream
        server.acceptMessage (inputString);
    } else {
        // Do cleanup
        quitting = true;
        socket.close ();
    }
}

アップストリーム スレッドのメイン ループは、入力文字列を調べます。QUIT の場合、スレッドはフラグを設定して、クライアントが通信を終了し、ループを終了したことを示します。これにより、上流のスレッドが適切にシャットダウンされます。

ダウンストリーム スレッドのメイン ループは、接続終了フラグが設定されていない限り、BlockingQueue でメッセージを待機します。その場合、下流のスレッドも終了するはずです。しかし、そうではありません。ただそこに座って待っています。その疑似コードは次のようになります。

while (!quitting) {
    outputMessage = messageQueue.take (); // blocks
    sendMessageToClient (outputMessage);
}

これをテストしたところ、クライアントが終了すると、上流のスレッドはシャットダウンされましたが、下流のスレッドはシャットダウンされませんでした。

少し頭をかきむしった後、ダウンストリーム スレッドがまだ BlockingQueue でブロックされており、決して来ない受信メッセージを待っていることに気付きました。アップストリーム スレッドは、QUIT メッセージをチェーンのそれ以上上に転送しません。

ダウンストリーム スレッドを正常にシャットダウンするにはどうすればよいですか? 最初に頭に浮かんだアイデアは、take() 呼び出しにタイムアウトを設定することでした。ただし、どのような値を選択しても、完全に満足できるものではないため、この考えにはあまり熱心ではありません。長すぎてシャットダウンする前にゾンビスレッドが長時間そこに留まっているか、短すぎて数分間アイドル状態になったがまだ有効な接続が強制終了されます。私は QUIT メッセージをチェーンに送信することを考えましたが、それには、サーバー、次にアプリケーション、そして再びサーバーに戻り、最後にセッションに完全に往復する必要があります。これもエレガントなソリューションとは思えません。

Thread.stop() のドキュメントを見ましたが、とにかく適切に機能しなかったため、これは明らかに非推奨であるため、実際にはオプションではないようです。私が持っていた別のアイデアは、何らかの方法でダウンストリーム スレッドで強制的に例外をトリガーし、finally ブロックでクリーンアップさせることでしたが、これは恐ろしくて下品なアイデアだと思いました。

両方のスレッドが独自に正常にシャットダウンできるはずだと思いますが、一方のスレッドが終了した場合、もう一方のスレッドがチェックするフラグを設定するだけでなく、より積極的な方法で終了するようにもう一方のスレッドにも通知する必要があると思います。私はまだ Java の経験が浅いので、現時点ではアイデアがありません。誰かアドバイスがあれば、大歓迎です。

完全を期すために、Session クラスの実際のコードを以下に含めましたが、上記の疑似コード スニペットが問題の関連部分をカバーしていると思います。フルクラスは約250行です。

import java.io.*;
import java.net.*;
import java.util.concurrent.*;
import java.util.logging.*;

/**
 * Session class
 * 
 * A session manages the individual connection between a client and the server. 
 * It accepts input from the client and sends output to the client over the 
 * provided socket.
 * 
 */
public class Session {
    private Socket              clientSocket    = null;
    private Server              server          = null;
    private Integer             sessionId       = 0;
    private DownstreamThread    downstream      = null;
    private UpstreamThread      upstream        = null;
    private boolean             sessionEnding   = false;

    /**
     * This thread handles waiting for messages from the server and sending
     * them to the client
     */
    private class DownstreamThread implements Runnable {
        private BlockingQueue<DownstreamMessage>    incomingMessages    = null;
        private OutputStreamWriter                  streamWriter        = null;
        private Session                             outer               = null;

        @Override
        public void run () {
            DownstreamMessage message;
            Thread.currentThread ().setName ("DownstreamThread_" + outer.getId ());

            try {
                // Send connect message
                this.sendMessageToClient ("Hello, you are client " + outer.getId ());

                while (!outer.sessionEnding) {
                    message = this.incomingMessages.take ();
                    this.sendMessageToClient (message.getPayload ());
                }

                // Send disconnect message
                this.sendMessageToClient ("Goodbye, client " + getId ());

            } catch (InterruptedException | IOException ex) {
                Logger.getLogger (DownstreamThread.class.getName ()).log (Level.SEVERE, ex.getMessage (), ex);
            } finally {
                this.terminate ();
            }
        }

        /**
         * Add a message to the downstream queue
         * 
         * @param message
         * @return
         * @throws InterruptedException 
         */
        public DownstreamThread acceptMessage (DownstreamMessage message) throws InterruptedException {
            if (!outer.sessionEnding) {
                this.incomingMessages.put (message);
            }

            return this;
        }

        /**
         * Send the given message to the client
         * 
         * @param message
         * @throws IOException 
         */
        private DownstreamThread sendMessageToClient (CharSequence message) throws IOException {
            OutputStreamWriter osw;
            // Output to client
            if (null != (osw = this.getStreamWriter ())) {
                osw.write ((String) message);
                osw.write ("\r\n");
                osw.flush ();
            }

            return this;
        }

        /**
         * Perform session cleanup
         * 
         * @return 
         */
        private DownstreamThread terminate () {
            try {
                this.streamWriter.close ();
            } catch (IOException ex) {
                Logger.getLogger (DownstreamThread.class.getName ()).log (Level.SEVERE, ex.getMessage (), ex);
            }
            this.streamWriter   = null;

            return this;
        }

        /**
         * Get an output stream writer, initialize it if it's not active
         * 
         * @return A configured OutputStreamWriter object
         * @throws IOException 
         */
        private OutputStreamWriter getStreamWriter () throws IOException {
            if ((null == this.streamWriter) 
            && (!outer.sessionEnding)) {
                BufferedOutputStream os = new BufferedOutputStream (outer.clientSocket.getOutputStream ());
                this.streamWriter       = new OutputStreamWriter (os, "UTF8");
            }

            return this.streamWriter;
        }

        /**
         * 
         * @param outer 
         */
        public DownstreamThread (Session outer) {
            this.outer              = outer;
            this.incomingMessages   = new LinkedBlockingQueue ();
            System.out.println ("Class " + this.getClass () + " created");
        }
    }

    /**
     * This thread handles waiting for client input and sending it upstream
     */
    private class UpstreamThread implements Runnable {
        private Session outer   = null;

        @Override
        public void run () {
            StringBuffer    inputBuffer = new StringBuffer ();
            BufferedReader  inReader;

            Thread.currentThread ().setName ("UpstreamThread_" + outer.getId ());

            try {
                inReader    = new BufferedReader (new InputStreamReader (outer.clientSocket.getInputStream (), "UTF8"));

                while (!outer.sessionEnding) {
                    // Read whatever was in the input buffer
                    inputBuffer.delete (0, inputBuffer.length ());
                    inputBuffer.append (inReader.readLine ());
                    System.out.println ("Input message was: " + inputBuffer);

                    if (!inputBuffer.toString ().equals ("QUIT")) {
                        // Forward the message up the chain to the Server
                        outer.server.acceptMessage (new UpstreamMessage (sessionId, inputBuffer.toString ()));
                    } else {
                        // End the session
                        outer.sessionEnding = true;
                    }
                }

            } catch (IOException | InterruptedException e) {
                Logger.getLogger (Session.class.getName ()).log (Level.SEVERE, e.getMessage (), e);
            } finally {
                outer.terminate ();
                outer.server.deleteSession (outer.getId ());
            }
        }

        /**
         * Class constructor
         * 
         * The Core Java volume 1 book said that a constructor such as this 
         * should be implicitly created, but that doesn't seem to be the case!
         * 
         * @param outer 
         */
        public UpstreamThread (Session outer) {
            this.outer  = outer;
            System.out.println ("Class " + this.getClass () + " created");
        }
    }

    /**
     * Start the session threads
     */
    public void run () //throws InterruptedException 
    {
        Thread upThread     = new Thread (this.upstream);
        Thread downThread   = new Thread (this.downstream);

        upThread.start ();
        downThread.start ();
    }

    /**
     * Accept a message to send to the client
     * 
     * @param message
     * @return
     * @throws InterruptedException 
     */
    public Session acceptMessage (DownstreamMessage message) throws InterruptedException {
        this.downstream.acceptMessage (message);
        return this;
    }

    /**
     * Accept a message to send to the client
     * 
     * @param message
     * @return
     * @throws InterruptedException 
     */
    public Session acceptMessage (String message) throws InterruptedException {
        return this.acceptMessage (new DownstreamMessage (this.getId (), message));
    }

    /**
     * Terminate the client connection
     */
    private void terminate () {
        try {
            this.clientSocket.close ();
        } catch (IOException e) {
            Logger.getLogger (Session.class.getName ()).log (Level.SEVERE, e.getMessage (), e);
        }
    }

    /**
     * Get this Session's ID
     * 
     * @return The ID of this session
     */
    public Integer getId () {
        return this.sessionId;
    }

    /**
     * Session constructor
     * 
     * @param owner The Server object that owns this session
     * @param sessionId The unique ID this session will be given
     * @throws IOException 
     */
    public Session (Server owner, Socket clientSocket, Integer sessionId) throws IOException {

        this.server         = owner;
        this.clientSocket   = clientSocket;
        this.sessionId      = sessionId;
        this.upstream       = new UpstreamThread (this);
        this.downstream     = new DownstreamThread (this);

        System.out.println ("Class " + this.getClass () + " created");
        System.out.println ("Session ID is " + this.sessionId);
    }
}
4

2 に答える 2

3

Thread.stopuseを呼び出す代わりにThread.interrupt. これにより、メソッドは、シャットダウンする必要があることを知るために使用できるtakeをスローします。InterruptedException

于 2013-05-01T20:42:51.990 に答える
0

outer.sessionEnding「QUIT」が表示されたときにtrueに設定するのではなく、「偽の」終了メッセージを作成できますか。この偽のメッセージをキューに入れると、DownstreamThread が起動し、終了できます。その場合、この sessionEnding 変数を削除することもできます。

疑似コードでは、これは次のようになります。

while (true) {
    outputMessage = messageQueue.take (); // blocks
    if (QUIT == outputMessage)
        break
    sendMessageToClient (outputMessage);
}
于 2013-05-01T20:43:34.727 に答える