2

websocket サーバーからメッセージを送受信する Java アプリを作成しています。アプリがメッセージを受信すると、処理に時間がかかる場合があります。したがって、複数のスレッドを使用してメッセージを受信しようとしています。私の理解でGrizzlyは、セレクタースレッドとワーカースレッドがあります。デフォルトでは、1 つのセレクター スレッドと 2 つのワーカー スレッドがあります。次の例では、これらをそれぞれ 5 と 10 に増やしています。以下の例では、を呼び出すスレッドを一時停止していますonMessage受信情報の処理をシミュレートするための 10 秒間のメソッド。情報は毎秒入ってくるため、10 個のスレッドでトラフィック量を処理できるはずです。実行をプロファイリングすると、1 つのセレクター スレッドのみが実行され、2 つの作業スレッドが実行されます。さらに、メッセージは 10 秒間隔でのみ受信されます。1 つのスレッドだけがトラフィックを処理していることを示しています。これは非常に奇妙です。プロファイリング中、たとえば 1 つのワーカー スレッドGrizzly(1)が最初に送信されたメッセージを受信します。それから 10 秒後、'Grizzly(2)' は 2 番目のメッセージを受信します。その後Grizzly(2)、メッセージを受信し続け、Grizzly(1)何もアクションを実行しません。

誰かがこの奇妙な動作と、たとえば 10 スレッドが常にメッセージを待っているように変更する方法を説明できますか?

主要:

    public static void main(String[] args) {
        WebsocketTextClient client = new WebsocketTextClient();
        client.connect();
        for (int i = 0; i < 60; i++) {
            client.send("Test message " + i);
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                System.out.println("Error sleeping!");
            }
        }
    }

WebsocketTextClient.java:

import java.net.URI;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.EndpointConfig;
import javax.websocket.Session;
import javax.websocket.Endpoint;
import javax.websocket.MessageHandler;
import org.glassfish.tyrus.client.ClientManager;
import org.glassfish.tyrus.client.ThreadPoolConfig;
import org.glassfish.tyrus.container.grizzly.client.GrizzlyClientProperties;

public class WebsocketTextClient {

    private ClientManager client;
    private ClientEndpointConfig clientConfig;
    WebsocketTextClientEndpoint endpoint;

    public WebsocketTextClient() {
        client = ClientManager.createClient();
        client.getProperties().put(GrizzlyClientProperties.SELECTOR_THREAD_POOL_CONFIG, ThreadPoolConfig.defaultConfig().setMaxPoolSize(5));
        client.getProperties().put(GrizzlyClientProperties.WORKER_THREAD_POOL_CONFIG, ThreadPoolConfig.defaultConfig().setMaxPoolSize(10));
    }

    public boolean connect() {
        try {
            clientConfig = ClientEndpointConfig.Builder.create().build();
            endpoint = new WebsocketTextClientEndpoint();
            client.connectToServer(endpoint, clientConfig, new URI("wss://echo.websocket.org"));
        } catch (Exception e) {
            return false;
        }
        return true;
    }

    public boolean disconnect() {
        return false;
    }

    public boolean send(String message) {
        endpoint.session.getAsyncRemote().sendText(message);
        return true;
    }

    private class WebsocketTextClientEndpoint extends Endpoint {
        Session session;

        @Override
        public void onOpen(Session session, EndpointConfig config) {
            System.out.println("Connection opened");
            this.session = session;
            session.addMessageHandler(new WebsocketTextClientMessageHandler());
        }
    }

    private class WebsocketTextClientMessageHandler implements MessageHandler.Whole<String> {

        @Override
        public void onMessage(String message) {
            System.out.println("Message received from " + Thread.currentThread().getName() + " " + message);
            try {
                Thread.sleep(10000);
            } catch (Exception e) {
                System.out.println("Error sleeping!");
            }
            System.out.println("Resuming");
        }
    }
}
4

1 に答える 1

3

あなたが求めているように見えるのは、WebSockets が同じクライアント接続によって送信された複数のメッセージを受信し、それらのメッセージを別々のスレッドで処理し、準備が整ったときに応答を送信できるようにすることです。このシナリオは、クライアントがマルチスレッドの場合にのみ発生します。

同じ WebSocket セッションで複数のスレッドを処理するには、通常、WebSocket がクライアントとの間でやり取りされるデータを多重化する機能が必要です。これは現在 WebSocket の機能ではありませんが、その上に構築できることは確かです。ただし、これらのクライアント スレッドとサーバー スレッドを 1 つのチャネルで多重化すると、かなりの複雑さが生じます。これは、すべてのクライアント スレッドとサーバー スレッドが誤って相互に上書きしたり枯渇したりしないようにする必要があるためです。

MessageHandler の Java 仕様は、おそらくスレッド モデルに関して少しあいまいです。

https://docs.oracle.com/javaee/7/api/javax/websocket/MessageHandler.html言います:

各 Web ソケット セッションは、一度に複数のスレッドを使用して MessageHandler を呼び出しません。

しかし、ここで重要な用語は「ソケットセッション」です。クライアントが同じ WebSocket セッション内で複数のメッセージを送信している場合、サーバー側のハンドラーは単一のスレッド内で実行されます。これは、特に両端で Input/OutputStreams (または Writers) を使用している場合、スレッド内で多くの興味深いことを実行できないという意味ではありません。これ、クライアントとの通信が 1 つのスレッドによって仲介されることを意味します。通信を多重化したい場合は、ソケットの上に何かを書く必要があります。これには、リクエストをディスパッチするための独自のスレッド モデルの開発が含まれます。

より簡単な解決策は、クライアントの要求ごとに新しいセッションを作成することです。各クライアント要求は、セッション (つまり、TCP 接続) を開始し、データを送信して、結果を待ちます。これにより、複数の MessageHandler スレッド (仕様ごとにセッションごとに 1 つ) が提供されます。

これは、サーバー側でマルチスレッドを実現する最も簡単な方法です。他のアプローチでは、多重化メカニズムが必要になる傾向があります。これは、ユースケースによっては、おそらく努力する価値がなく、確かにいくらかの複雑さとリスクを伴います。

クライアントとサーバー間のセッション (TCP/HTTP 接続) の数が気になる場合は、クライアント側でセッションのプールを作成し、各クライアント セッションを一度に 1 つずつ使用して、セッションを返すことを検討できます。クライアントがそれを使い終わったときはいつでも、プールに。

最後に、直接関係ないかもしれませんが、Payara Micro を使用して WebSocket エンドポイントを提供するときに、次のように設定する必要があることがわかりました。

  <resources>
    ...
    <managed-executor-service maximum-pool-size="200" core-pool-size="10" long-running-tasks="true" keep-alive-seconds="300" hung-after-seconds="300" task-queue-capacity="20000" jndi-name="concurrent/__defaultManagedExecutorService" object-type="system-all"></managed-executor-service>

デフォルトの ManagedExecutorService は単一のスレッドのみを提供します。これは、Glassfish にも当てはまります。これは、私を混乱させていたのはプールサイズだけだったのに、スレッドモデルを理解していないと思って何時間も走り回っていました.

于 2016-08-15T05:57:38.190 に答える