6

SOについても同様の質問をたくさん見ましたが、写真にはほとんどありませんSocket。ですので、時間をかけて質問を読んでください。

リクエストをリッスンするサーバーアプリ(を使用ServerSocket)があり、クライアントが接続を試みると、クライアントにサービスを提供するために新しいスレッドが作成されます(サーバーは新しいリクエストのリッスンモードに戻ります)。ここで、他のクライアントがサーバーに送信した内容に基づいて、1つのクライアントに応答する必要があります。

例:

  • 着信接続をリッスンするServerSocket。
  • クライアントAが接続し、Aにサービスを提供するために新しいスレッドが作成されます。
  • クライアントBが接続し、Bにサービスを提供するために新しいスレッドが作成されます。
  • Aは、「HellofromA」というメッセージをサーバーに送信します。
  • このメッセージをクライアントBへの応答として送信します。

私はこの「スレッド間通信」全体に不慣れです。明らかに、上記の状況は非常に単純に聞こえますが、サーバーを中間として維持しているクライアント間で大量のデータを交換するため、ヒントを得るためにこれを説明しています。

また、共有オブジェクトをたとえば10個の特定のクライアントに制限したい場合はどうすればよいですか?11番目のクライアントがサーバーに接続するときに、新しい共有オブジェクトを作成します。これは、11番目、12番目、13番目.....から20番目までのクライアント間でデータを交換するために使用されます。10クライアントのすべてのセットについても同様です。

私が試したこと:(ばかげていると思います)

  • publicそのオブジェクトがとして共有されることになっているクラスがあるpublic staticので、のようにインスタンス化せずにグローバルとして使用できますMyGlobalClass.SharedMsg
  • それは機能しません。あるスレッドで受信したデータを別のスレッドに送信できませんでした。

1つのスレッドがオブジェクトに書き込みを行っている場合、最初のスレッドが書き込みを完了するまで、他のスレッドはそのオブジェクトにアクセスできないため、明らかなロックの問題があることを認識しています。

では、この問題への理想的なアプローチは何でしょうか?

アップデート

着信接続要求を処理するためのスレッドを作成する方法以来、上記のようにグローバルオブジェクトを使用することは機能しないため、スレッド間で同じオブジェクトを共有する方法を理解できません。

以下は、着信接続をリッスンし、サービングスレッドを動的に作成する方法です。

// Method of server class
public void startServer()
{
    if (!isRunning)
    {
        try
        {
            isRunning = true;
            while (isRunning)
            {
                try
                {
                    new ClientHandler(mysocketserver.accept()).start();
                }
                catch (SocketTimeoutException ex)
                {
                    //nothing to perform here, go back again to listening.
                }
                catch (SocketException ex)
                {
                    //Not to handle, since I'll stop the server using SocketServer's close() method, and its going to throw SocketException anyway.
                }
            }
        }
        catch (Exception ex)
        {
            ex.printStackTrace();
        }
    }
    else
        System.out.println("Server Already Started!");
}

そしてClientHandlerクラス。

public class ClientHandler extends Thread
{
    private Socket client = null;
    private ObjectInputStream in = null;
    private ObjectOutputStream out = null;

    public ClientHandler(Socket client)
    {
        super("ClientHandler");
        this.client = client;
    }

    //This run() is common for every Client that connects, and that's where the problem is.
    public void run()
    {
        try
        {
            in = new ObjectInputStream(client.getInputStream());
            out = new ObjectOutputStream(client.getOutputStream());

            //Message received from this thread.
            String msg = in.readObject().toString();
            System.out.println("Client @ "+ client.getInetAddress().getHostAddress() +" Says : "+msg);


            //Response to this client.
            out.writeObject("Message Received");

            out.close();
            in.close();
            client.close();
        }
        catch (Exception ex)
        {
            ex.printStackTrace();
        }
    }
}

接続する各クライアントにサービスを提供する動的スレッドを作成する方法は、グローバルオブジェクトを使用して共有することはできないと思います。これは、run()上記の本体が接続するすべてのクライアントでまったく同じであるため、この同じ方法が消費者と生産者の両方。接続ごとに動的スレッドを作成し、同じオブジェクトを共有できるようにするには、どのような修正を行う必要がありますか。

4

1 に答える 1

6

おそらく、各クライアント間の通信用にキューが必要です。各キューは、あるクライアントから別のクライアントにプッシュされるデータの「パイプライン」になります。

次のように使用します (疑似コード):

Thread 1:
Receive request from Client A, with message for Client B
Put message on back of concurrent Queue A2B
Respond to Client A.

Thread 2:
Receive request from Client B.
Pop message from front of Queue A2B
Respond to Client B with message.

多くのクライアント (したがって、多くのスレッド) が書き込める AllToB Queue を用意するために、一般的なものも必要になる場合があります。

注目のクラス: ConcurrentLinkedQueueArrayBlockingQueue

メッセージの数を制限したい場合は、ArrayBlockingQueue とその容量コンストラクターを使用してこれを行うことができます。ブロッキング機能が必要ない場合は、offerandpollではなくputandを使用できますtake

キューを共有することについては心配しません。問題がかなり複雑になります。これは、対処する必要があるメモリ使用量の問題があることがわかっている場合にのみ行ってください。

編集:あなたの更新に基づいて:

動的に作成されたすべてのインスタンス間で単一のインスタンスを共有する必要がある場合は、次のいずれかを実行できます。

  1. 静的インスタンスを作成します。
  2. コンストラクタに渡します。

1の例:

public class ClientHandler extends Thread
{
  public static final Map<ClientHandler, BlockingQueue<String>> messageQueues 
    = new ConcurrentHashMap<>();

  <snip>

  public ClientHandler(Socket client)
  {
    super("ClientHandler");
    this.client = client;
    // Note: Bad practice to reference 'this' in a constructor.
    // This can throw an error based on what the put method does.
    // As such, if you are to do this, put it at the end of the method.
    messageQueues.put(this, new ArrayBlockingQueue<>());
  }

  // You can now access this in the run() method like so:
  // Get messages for the current client.
  // messageQueues.get(this).poll();
  // Send messages to the thread for another client.
  // messageQueues.get(someClient).offer(message);

いくつかのメモ:

  • messageQueues オブジェクトには、有効期間が短いオブジェクト参照ではなく、実際にはクライアントの何らかの識別子が含まれている必要があります。
  • よりテストしやすい設計では、messageQueues オブジェクトをコンストラクターに渡してモックを許可します。
  • おそらく、マップのラッパー クラスを使用することをお勧めします。これにより、マップのセマンティクスを気にすることなく、2 つのパラメーターを指定して offer を呼び出すことができます。
于 2012-06-08T06:17:56.283 に答える