私は経験豊富な Java プログラマーではないので、これが初心者の質問である場合はご容赦ください。
大きく 3 つのモジュールで構成されるシステムを設計しています。クライアント、サーバー、およびアプリケーション。アイデアは、クライアントがサーバーにメッセージを送信することです。サーバーは、アプリケーションでユース ケースをトリガーします。ユースケースの結果はサーバーに返され、サーバーはその結果をクライアントに送信します。私がこのアーキテクチャを選択したのは、一度に複数のクライアントをサポートする必要があると予想しているためです。サーバー モジュールを他のアプリケーションで再利用できるようにしたいのです。可能な限りドメイン ロジックを実装し、より高度な Java を学習する機会があるためです。
さまざまなモジュールをキューで結び付けることを計画しています。クライアントは十分に単純です。メッセージを発行し、応答が到着するまでブロックします (単純化しすぎている可能性がありますが、現時点では妥当なモデルです)。アプリケーションも同様に問題ではありません。入力キューでブロックし、有効なメッセージを受信するとユースケースを実行し、結果を出力キューにプッシュします。複数のクライアントを持つことで、物事は少し難しくなりますが、それでも私の経験レベルでは理解できます. サーバーは開いている接続ごとにスレッドを維持し、サーバーのアウトバウンド/アプリケーション インバウンド キューは同期されているため、2 つのメッセージが一度に到着した場合、2 番目のスレッドは最初のスレッドがそのペイロードをキューに配信するまで少し待つ必要があります。
問題は、2 つの独立したものをブロックする必要がある中間のサーバーです。サーバーは、クライアントとアプリケーションの出力キュー (サーバーの入力キューとして機能) の両方を監視しています。サーバーは、メッセージがクライアントから着信する (アプリケーションに転送される) まで、またはアプリケーションがタスクを完了して結果をアプリケーションのアウトバウンド/サーバー インバウンド キューにプッシュするまで、ブロックする必要があります。
私の知る限り、Java がブロックできるのは 1 つのことだけです。
クライアントがメッセージを送信するか、サーバーの受信キューが空でなくなるまで、サーバーをブロックすることは可能ですか?
アップデート:
私はこれに取り組むのに少し時間があり、問題を説明する最小限のものまで問題を切り詰めることができました。トリミングがあっても、従うべきややかさばるコード ダンプがあります。できるだけ分解してみます。
これはサーバーのコードです:
public class Server implements Runnable {
private int listenPort = 0;
private ServerSocket serverSocket = null;
private BlockingQueue<Message> upstreamMessaes = null;
private BlockingQueue<Message> downstreamMessages = null;
private Map<Integer, Session> sessions = new ConcurrentHashMap ();
private AtomicInteger lastId = new AtomicInteger ();
/**
* Start listening for clients to process
*
* @throws IOException
*/
@Override
public void run () {
int newSessionId;
Session newSession;
Thread newThread;
System.out.println (this.getClass () + " running");
// Client listen loop
while (true) {
newSessionId = this.lastId.incrementAndGet ();
try {
newSession = new Session (this, newSessionId);
newThread = new Thread (newSession);
newThread.setName ("ServerSession_" + newSessionId);
this.sessions.put (newSessionId, newSession);
newThread.start ();
} catch (IOException ex) {
Logger.getLogger (Server.class.getName ()).log (Level.SEVERE, null, ex);
}
}
}
/**
* Accept a connection from a new client
*
* @return The accepted Socket
* @throws IOException
*/
public Socket accept () throws IOException {
return this.getSocket().accept ();
}
/**
* Delete the specified Session
*
* @param sessionId ID of the Session to remove
*/
public void deleteSession (int sessionId) {
this.sessions.remove (sessionId);
}
/**
* Forward an incoming message from the Client to the application
*
* @param msg
* @return
* @throws InterruptedException
*/
public Server messageFromClient (Message msg) throws InterruptedException {
this.upstreamMessaes.put (msg);
return this;
}
/**
* Set the port to listen to
*
* We can only use ports in the range 1024-65535 (ports below 1024 are
* reserved for common protocols such as HTTP and ports above 65535 don't
* exist)
*
* @param listenPort
* @return Returns itself so further methods can be called
* @throws IllegalArgumentException
*/
public final Server setListenPort (int listenPort) throws IllegalArgumentException {
if ((listenPort > 1023) && (listenPort <= 65535)) {
this.listenPort = listenPort;
} else {
throw new IllegalArgumentException ("Port number " + listenPort + " not valid");
}
return this;
}
/**
* Get the server socket, initialize it if it isn't already started.
*
* @return The object's ServerSocket
* @throws IOException
*/
private ServerSocket getSocket () throws IOException {
if (null == this.serverSocket) {
this.serverSocket = new ServerSocket (this.listenPort);
}
return this.serverSocket;
}
/**
* Instantiate the server
*
* @param listenPort
* @throws IllegalArgumentException
*/
public Server ( int listenPort,
BlockingQueue<Message> incomingMessages,
BlockingQueue<Message> outgoingMessages) throws IllegalArgumentException {
this.setListenPort (listenPort);
this.upstreamMessaes = incomingMessages;
this.downstreamMessages = outgoingMessages;
System.out.println (this.getClass () + " created");
System.out.println ("Listening on port " + listenPort);
}
}
次のメソッドはサーバーに属していると思いますが、現在はコメントアウトされています。
/**
* Notify a Session of a message for it
*
* @param sessionMessage
*/
public void notifySession () throws InterruptedException, IOException {
Message sessionMessage = this.downstreamMessages.take ();
Session targetSession = this.sessions.get (sessionMessage.getClientID ());
targetSession.waitForServer (sessionMessage);
}
これは私のセッションクラスです
public class Session implements Runnable {
private Socket clientSocket = null;
private OutputStreamWriter streamWriter = null;
private StringBuffer outputBuffer = null;
private Server server = null;
private int sessionId = 0;
/**
* Session main loop
*/
@Override
public void run () {
StringBuffer inputBuffer = new StringBuffer ();
BufferedReader inReader;
try {
// Connect message
this.sendMessageToClient ("Hello, you are client " + this.getId ());
inReader = new BufferedReader (new InputStreamReader (this.clientSocket.getInputStream (), "UTF8"));
do {
// Parse whatever was in the input buffer
inputBuffer.delete (0, inputBuffer.length ());
inputBuffer.append (inReader.readLine ());
System.out.println ("Input message was: " + inputBuffer);
this.server.messageFromClient (new Message (this.sessionId, inputBuffer.toString ()));
} while (!"QUIT".equals (inputBuffer.toString ()));
// Disconnect message
this.sendMessageToClient ("Goodbye, client " + this.getId ());
} catch (IOException | InterruptedException e) {
Logger.getLogger (Session.class.getName ()).log (Level.SEVERE, null, e);
} finally {
this.terminate ();
this.server.deleteSession (this.getId ());
}
}
/**
*
* @param msg
* @return
* @throws IOException
*/
public Session waitForServer (Message msg) throws IOException {
// Generate a response for the input
String output = this.buildResponse (msg.getPayload ()).toString ();
System.out.println ("Output message will be: " + output);
// Output to client
this.sendMessageToClient (output);
return this;
}
/**
*
* @param request
* @return
*/
private StringBuffer buildResponse (CharSequence request) {
StringBuffer ob = this.outputBuffer;
ob.delete (0, this.outputBuffer.length ());
ob.append ("Server repsonded at ")
.append (new java.util.Date ().toString () )
.append (" (You said '" )
.append (request)
.append ("')");
return this.outputBuffer;
}
/**
* Send the given message to the client
*
* @param message
* @throws IOException
*/
private void sendMessageToClient (CharSequence message) throws IOException {
// Output to client
OutputStreamWriter osw = this.getStreamWriter ();
osw.write ((String) message);
osw.write ("\r\n");
osw.flush ();
}
/**
* Get an output stream writer, initialize it if it's not active
*
* @return A configured OutputStreamWriter object
* @throws IOException
*/
private OutputStreamWriter getStreamWriter () throws IOException {
if (null == this.streamWriter) {
BufferedOutputStream os = new BufferedOutputStream (this.clientSocket.getOutputStream ());
this.streamWriter = new OutputStreamWriter (os, "UTF8");
}
return this.streamWriter;
}
/**
* Terminate the client connection
*/
private void terminate () {
try {
this.streamWriter = null;
this.clientSocket.close ();
} catch (IOException e) {
Logger.getLogger (Session.class.getName ()).log (Level.SEVERE, null, e);
}
}
/**
* Get this Session's ID
*
* @return The ID of this session
*/
public int getId () {
return this.sessionId;
}
/**
* Session constructor
*
* @param owner The Server object that owns this session
* @param sessionId The unique ID this session will be given
* @throws IOException
*/
public Session (Server owner, int sessionId) throws IOException {
System.out.println ("Class " + this.getClass () + " created");
this.server = owner;
this.sessionId = sessionId;
this.clientSocket = this.server.accept ();
System.out.println ("Session ID is " + this.sessionId);
}
}
テスト アプリケーションはかなり基本的なもので、元の要求メッセージの変更されたバージョンをエコー バックするだけです。実際のアプリケーションは、メッセージを受信し、サーバーに意味のある応答を返します。
public class TestApp implements Runnable {
private BlockingQueue <Message> inputMessages, outputMessages;
@Override
public void run () {
Message lastMessage;
StringBuilder returnMessage = new StringBuilder ();
while (true) {
try {
lastMessage = this.inputMessages.take ();
// Construct a response
returnMessage.delete (0, returnMessage.length ());
returnMessage.append ("Server repsonded at ")
.append (new java.util.Date ().toString () )
.append (" (You said '" )
.append (lastMessage.getPayload ())
.append ("')");
// Pretend we're doing some work that takes a while
Thread.sleep (1000);
this.outputMessages.put (new Message (lastMessage.getClientID (), lastMessage.toString ()));
} catch (InterruptedException ex) {
Logger.getLogger (TestApp.class.getName ()).log (Level.SEVERE, null, ex);
}
}
}
/**
* Initialize the application
*
* @param inputMessages Where input messages come from
* @param outputMessages Where output messages go to
*/
public TestApp (BlockingQueue<Message> inputMessages, BlockingQueue<Message> outputMessages) {
this.inputMessages = inputMessages;
this.outputMessages = outputMessages;
System.out.println (this.getClass () + " created");
}
}
Message クラスは非常に単純で、元のクライアント ID とペイロード文字列だけで構成されているため、省略しています。
最後にメインクラスはこんな感じ。
public class Runner {
/**
*
* @param args The first argument is the port to listen on.
* @throws Exception
*/
public static void main (String[] args) throws Exception {
BlockingQueue<Message> clientBuffer = new LinkedBlockingQueue ();
BlockingQueue<Message> appBuffer = new LinkedBlockingQueue ();
TestApp appInstance = new TestApp (clientBuffer, appBuffer);
Server serverInstance = new Server (Integer.parseInt (args [0]), clientBuffer, appBuffer);
Thread appThread = new Thread (appInstance);
Thread serverThread = new Thread (serverInstance);
appThread.setName("Application");
serverThread.setName ("Server");
appThread.start ();
serverThread.start ();
appThread.join ();
serverThread.join ();
System.exit (0);
}
}
実際のアプリケーションはより複雑になりますが、TestApp は基本的な使用パターンを示しています。そこに何かがあるまで入力キューでブロックし、それを処理してから、結果を出力キューにプッシュします。
セッション クラスは、特定のクライアントとサーバー間のライブ接続を管理します。クライアントから入力を受け取り、それを Message オブジェクトに変換します。また、サーバーから Message オブジェクトを受け取り、それらを出力に変換してクライアントに送信します。
サーバーは、新しい着信接続をリッスンし、着信接続ごとに Session オブジェクトを設定します。セッションがメッセージを渡すと、アプリケーションが処理できるようにアップストリーム キューに入れます。
私が抱えている問題は、TestApp からさまざまなクライアントに戻るメッセージを返すことです。クライアントからのメッセージが着信すると、セッションはメッセージを生成してサーバーに送信します。サーバーはそれをアップストリーム キューに入れます。これは、TestApp の入力キューでもあります。これに応答して、TestApp は応答メッセージを生成し、サーバーのダウンストリーム キューでもある出力キューに入れます。
これは、セッションが 2 つの無関係なイベントを待機する必要があることを意味します。彼らはまでブロックする必要があります
- 入力がクライアントから到着します (クライアント ソケットの BufferedReader は処理する入力を持っています)。
- または、サーバーからメッセージが送信されます (サーバーはセッションで WaitForServer () メソッドを呼び出します)。
サーバー自体に関しては、2 つの無関係なイベントも待機する必要があります。
- Session は、TestApp に渡すメッセージを含む messageFromClient() を呼び出します。
- または、TestApp がメッセージを出力/ダウンストリーム キューにプッシュして、セッションに渡します。
一見簡単そうに見えた作業が、当初の想像よりもはるかに困難であることがわかりました。私はまだ並行プログラミングにかなり慣れていないので、明らかなことを見落としていると思いますが、どこが間違っているのかを指摘していただけると助かります。