2

私は次の2行を書きました

ServerSocket mobCom = new ServerSocket(9846);
Socket server = mobCom.accept();

新しい TCP 接続を作成し、その接続を新しいスレッドで処理する必要があります。たとえば、上記のコードはサーバー ソケットを作成します。そして、複数のクライアントがあります。クライアントがサーバーに接続するたびに、その特定のクライアントからのリクエストに対応する新しいスレッドが作成される場合があります。同じことをどのように実装しますか。

編集

また、スレッド プールを 10 ユーザーに制限したいと考えています。さらに多くのユーザーが発生した場合は、それ以上のリクエストを処理せずにエラー メッセージを送信したいと考えています。

4

2 に答える 2

0

Java util のコンカレントのSynchronousQueueを使用して、目的の結果を得ることができます。一定数のワーカーを作成します。take呼び出しを使用して、SynchronousQueue へのブロック読み取りを開始します。したがって、すべてのワーカーがそれぞれ 1 つの作業を取得し、それらの処理 (ソケットとの通信) でビジー状態である場合、SynchronousQueue からの読み取りは行われないため、同期キューへのオファーは失敗します。この失敗をチェックすると (つまり、すべての固定数のワーカーがビジーであり、現在キューにラッチされているワーカーがないことを意味します)、次のリクエストを拒否します。

次の行のサンプル コード [未テスト - 簡潔にするために例外を回避しました。必要に応じて修正してください]。

public class BoundedServer 
{
    public static void main(String[] args) 
    {
        /**
         * Port to serve
         */
        final int port = 2013;

        /**
         * Max Workers
         */
        final int maxworkers = 10; 

        /**
         * The server socket.
         */
        ServerSocket mServerSocket = null;

        /**
         * Queue of work units to process if there is a worker available.
         */
        final SynchronousQueue<WorkUnit> mQueueToProcess = new SynchronousQueue<WorkUnit>();

        /**
         * Queue of work units to reject if there is no current worker available.
         */
        final LinkedBlockingQueue<WorkUnit> mQueueToReject = new LinkedBlockingQueue<WorkUnit>(); 

        /**
         * A thread pool to handle the work.
         */
        final ExecutorService communicationservice = Executors.newFixedThreadPool(maxworkers);

        /**
         * Let a single thread take care of rejecting the requests when needed to do so.
         */
        final ExecutorService rejectionservice = Executors.newSingleThreadExecutor();

        try 
        {
            Runnable communicationlauncher = new Runnable() 
            {
                public void run() 
                {
                    try
                    {
                        /**
                         * Set of workers to handle the work.
                         */
                        final CommunicationWorker[] workers = new CommunicationWorker[maxworkers];

                        communicationservice.invokeAll(Arrays.asList(workers));
                    }
                    finally
                    {
                        communicationservice.shutdown();
                    }
                }
            };

            new Thread(communicationlauncher).start();

            Runnable rejectionlauncher = new Runnable() 
            {
                public void run() 
                {
                    try
                    {
                        RejectionWorker rejectionworker = new RejectionWorker(mQueueToReject);

                        rejectionservice.submit(rejectionworker);
                    }
                    finally
                    {
                        rejectionservice.shutdown();
                    }
                }
            };
            new Thread(rejectionlauncher).start();

            mServerSocket = new ServerSocket(port);

            while(true)
            {
                WorkUnit work = new WorkUnit(mServerSocket.accept());

                if(!mQueueToProcess.offer(work))
                {
                    mQueueToReject.add(work);
                }
            }
        } 
        finally
        {
            try
            {
                mServerSocket.close();
            }
        }
    }
}


public class WorkUnit 
{
    private Socket mSocket = null;

    public WorkUnit(Socket socket) 
    {
        super();
        this.setSocket(socket);
    }

    public Socket getSocket() {
        return mSocket;
    }

    public void setSocket(Socket mSocket) {
        this.mSocket = mSocket;
    }
}

public class CommunicationWorker 
implements Callable<Boolean> 
{
    private SynchronousQueue<WorkUnit> mQueueToProcess;

    public CommunicationWorker(SynchronousQueue<WorkUnit> queueToProcess) 
    {
        super();
        this.mQueueToProcess = queueToProcess;
    }

    @Override
    public Boolean call() throws Exception 
    {
        while(true)
        {
            WorkUnit work = mQueueToProcess.take();

            Socket socket = work.getSocket();

            // Code to handle socket communication and closure.
            // Once the communication is finished, this thread will get blocked to mQueueToProcess.
        }
    }
}


public class RejectionWorker 
implements Callable<Boolean> 
{
    private LinkedBlockingQueue<WorkUnit> mQueueToReject;

    public RejectionWorker(LinkedBlockingQueue<WorkUnit> queueToReject) 
    {
        super();
        this.mQueueToReject = queueToReject;
    }

    @Override
    public Boolean call() throws Exception 
    {
        while(true)
        {
            WorkUnit work = mQueueToReject.take();

            Socket socket = work.getSocket();

            // Code to reject the request.
        }
    }
}
于 2013-03-27T20:09:57.920 に答える
-1

次のようなことをする必要があります。ServiceThread は、リクエストを処理するスレッドです。

 while (true) {
              try {
                  Socket clientSocket = null;
                  if (null != serverSocket) {
                    clientSocket = serverSocket.accept();
                    ServiceThread serverThread = new ServiceThread(clientSocket); // Create a new thread for each client
                    serverThread.start();
                  }
              }  catch( Exception ex ) {
                  System.out.println("Exception while accepting connection " + ex.getMessage());
                  ex.printStackTrace();
              }
于 2013-03-27T18:13:53.060 に答える