websocket サーバーからメッセージを送受信する Java アプリを作成しています。アプリがメッセージを受信すると、処理に時間がかかる場合があります。したがって、複数のスレッドを使用してメッセージを受信しようとしています。私の理解でGrizzly
は、セレクタースレッドとワーカースレッドがあります。デフォルトでは、1 つのセレクター スレッドと 2 つのワーカー スレッドがあります。次の例では、これらをそれぞれ 5 と 10 に増やしています。以下の例では、を呼び出すスレッドを一時停止していますonMessage
受信情報の処理をシミュレートするための 10 秒間のメソッド。情報は毎秒入ってくるため、10 個のスレッドでトラフィック量を処理できるはずです。実行をプロファイリングすると、1 つのセレクター スレッドのみが実行され、2 つの作業スレッドが実行されます。さらに、メッセージは 10 秒間隔でのみ受信されます。1 つのスレッドだけがトラフィックを処理していることを示しています。これは非常に奇妙です。プロファイリング中、たとえば 1 つのワーカー スレッドGrizzly(1)
が最初に送信されたメッセージを受信します。それから 10 秒後、'Grizzly(2)' は 2 番目のメッセージを受信します。その後Grizzly(2)
、メッセージを受信し続け、Grizzly(1)
何もアクションを実行しません。
誰かがこの奇妙な動作と、たとえば 10 スレッドが常にメッセージを待っているように変更する方法を説明できますか?
主要:
public static void main(String[] args) {
WebsocketTextClient client = new WebsocketTextClient();
client.connect();
for (int i = 0; i < 60; i++) {
client.send("Test message " + i);
try {
Thread.sleep(1000);
} catch (Exception e) {
System.out.println("Error sleeping!");
}
}
}
WebsocketTextClient.java:
import java.net.URI;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.EndpointConfig;
import javax.websocket.Session;
import javax.websocket.Endpoint;
import javax.websocket.MessageHandler;
import org.glassfish.tyrus.client.ClientManager;
import org.glassfish.tyrus.client.ThreadPoolConfig;
import org.glassfish.tyrus.container.grizzly.client.GrizzlyClientProperties;
public class WebsocketTextClient {
private ClientManager client;
private ClientEndpointConfig clientConfig;
WebsocketTextClientEndpoint endpoint;
public WebsocketTextClient() {
client = ClientManager.createClient();
client.getProperties().put(GrizzlyClientProperties.SELECTOR_THREAD_POOL_CONFIG, ThreadPoolConfig.defaultConfig().setMaxPoolSize(5));
client.getProperties().put(GrizzlyClientProperties.WORKER_THREAD_POOL_CONFIG, ThreadPoolConfig.defaultConfig().setMaxPoolSize(10));
}
public boolean connect() {
try {
clientConfig = ClientEndpointConfig.Builder.create().build();
endpoint = new WebsocketTextClientEndpoint();
client.connectToServer(endpoint, clientConfig, new URI("wss://echo.websocket.org"));
} catch (Exception e) {
return false;
}
return true;
}
public boolean disconnect() {
return false;
}
public boolean send(String message) {
endpoint.session.getAsyncRemote().sendText(message);
return true;
}
private class WebsocketTextClientEndpoint extends Endpoint {
Session session;
@Override
public void onOpen(Session session, EndpointConfig config) {
System.out.println("Connection opened");
this.session = session;
session.addMessageHandler(new WebsocketTextClientMessageHandler());
}
}
private class WebsocketTextClientMessageHandler implements MessageHandler.Whole<String> {
@Override
public void onMessage(String message) {
System.out.println("Message received from " + Thread.currentThread().getName() + " " + message);
try {
Thread.sleep(10000);
} catch (Exception e) {
System.out.println("Error sleeping!");
}
System.out.println("Resuming");
}
}
}