1

ジャージーを使用して SSE シナリオを実装しています。

サーバーは接続を維持します。また、データを定期的にクライアントにプッシュします。

私のシナリオでは、接続制限があり、特定の数のクライアントのみが同時にサーバーにサブスクライブできます。

したがって、新しいクライアントがサブスクライブしようとしているときは、check(EventOutput.isClosed) を実行して、古い接続がアクティブでなくなっているかどうかを確認し、新しい接続のためのスペースを確保できるようにします。

ただし、クライアントが明示的に EventSource の close を呼び出さない限り、EventOutput.isClosed の結果は常に false です。これは、クライアントが偶発的にドロップした場合 (停電またはインターネットの切断)、依然として接続を占有しており、新しいクライアントがサブスクライブできないことを意味します。

これに対する回避策はありますか?

4

2 に答える 2

6

@CuiPengFei、

そのため、これに対する答えを自分で見つけようとして旅行中に、切断されたクライアントからの接続を適切にクリーンアップする方法を説明するリポジトリに出くわしました。

すべての SSE EventOutput ロジックを Service/Manager にカプセル化します。この中で、EventOutput がクライアントによって閉じられたかどうかを確認するスレッドを起動します。その場合、正式に接続を閉じます (EventOutput#close())。そうでない場合は、ストリームに書き込もうとします。例外がスローされた場合、クライアントは閉じられずに切断されており、それを閉じる処理を行います。書き込みが成功すると、まだアクティブな接続であるため、EventOutput がプールに返されます。

リポジトリ (および実際のクラス) は、こちらから入手できます。リポジトリが削除された場合に備えて、以下のインポートなしのクラスも含めました。

これをシングルトンにバインドすることに注意してください。ストアはグローバルに一意である必要があります。

public class SseWriteManager {

private final ConcurrentHashMap<String, EventOutput> connectionMap = new ConcurrentHashMap<>();

private final ScheduledExecutorService messageExecutorService;

private final Logger logger = LoggerFactory.getLogger(SseWriteManager.class);

public SseWriteManager() {
    messageExecutorService = Executors.newScheduledThreadPool(1);
    messageExecutorService.scheduleWithFixedDelay(new messageProcessor(), 0, 5, TimeUnit.SECONDS);
}

public void addSseConnection(String id, EventOutput eventOutput) {
    logger.info("adding connection for id={}.", id);
    connectionMap.put(id, eventOutput);
}

private class messageProcessor implements Runnable {
    @Override
    public void run() {
        try {
            Iterator<Map.Entry<String, EventOutput>> iterator = connectionMap.entrySet().iterator();
            while (iterator.hasNext()) {
                boolean remove = false;
                Map.Entry<String, EventOutput> entry = iterator.next();
                EventOutput eventOutput = entry.getValue();
                if (eventOutput != null) {
                    if (eventOutput.isClosed()) {
                        remove = true;
                    } else {
                        try {
                            logger.info("writing to id={}.", entry.getKey());
                            eventOutput.write(new OutboundEvent.Builder().name("custom-message").data(String.class, "EOM").build());
                        } catch (Exception ex) {
                            logger.info(String.format("write failed to id=%s.", entry.getKey()), ex);
                            remove = true;
                        }
                    }
                }
                if (remove) {
                    // we are removing the eventOutput. close it is if it not already closed.
                    if (!eventOutput.isClosed()) {
                        try {
                            eventOutput.close();
                        } catch (Exception ex) {
                            // do nothing.
                        }
                    }
                    iterator.remove();
                }
            }
        } catch (Exception ex) {
            logger.error("messageProcessor.run threw exception.", ex);
        }
    }
}

public void shutdown() {
    if (messageExecutorService != null && !messageExecutorService.isShutdown()) {
        logger.info("SseWriteManager.shutdown: calling messageExecutorService.shutdown.");
        messageExecutorService.shutdown();
    } else {
        logger.info("SseWriteManager.shutdown: messageExecutorService == null || messageExecutorService.isShutdown().");
    }

}} 
于 2015-11-20T03:35:32.763 に答える
2

これに関する最新情報を提供したかった:

何が起きていたかというと、クライアント側 (js) の eventSource は、新しいサブスクリプションが追加されるとすぐにブロードキャストを行わない限り、readyState '1' にならないということです。この状態でも、クライアントはサーバーからプッシュされたデータを受け取ることができました。単純な「OK」メッセージのブロードキャストを行う呼び出しを追加すると、eventSource を readyState 1 にするのに役立ちました。

クライアント側からの接続を閉じるとき。リソースのクリーンアップを積極的に行うには、クライアント側で eventSource を閉じるだけでは役に立ちません。サーバーにブロードキャストを強制するために、サーバーに対して別の ajax 呼び出しを行う必要があります。ブロードキャストが強制されると、jersey はアクティブでなくなった接続をクリーンアップし、次にリソースを解放します (CLOSE_WAIT の接続)。そうでない場合、次のブロードキャストが発生するまで、接続は CLOSE_WAIT に残ります。

于 2015-05-25T20:56:54.490 に答える