@CuiPengFei、
そのため、これに対する答えを自分で見つけようとして旅行中に、切断されたクライアントからの接続を適切にクリーンアップする方法を説明するリポジトリに出くわしました。
すべての SSE EventOutput ロジックを Service/Manager にカプセル化します。この中で、EventOutput がクライアントによって閉じられたかどうかを確認するスレッドを起動します。その場合、正式に接続を閉じます (EventOutput#close())。そうでない場合は、ストリームに書き込もうとします。例外がスローされた場合、クライアントは閉じられずに切断されており、それを閉じる処理を行います。書き込みが成功すると、まだアクティブな接続であるため、EventOutput がプールに返されます。
リポジトリ (および実際のクラス) は、こちらから入手できます。リポジトリが削除された場合に備えて、以下のインポートなしのクラスも含めました。
これをシングルトンにバインドすることに注意してください。ストアはグローバルに一意である必要があります。
public class SseWriteManager {
private final ConcurrentHashMap<String, EventOutput> connectionMap = new ConcurrentHashMap<>();
private final ScheduledExecutorService messageExecutorService;
private final Logger logger = LoggerFactory.getLogger(SseWriteManager.class);
public SseWriteManager() {
messageExecutorService = Executors.newScheduledThreadPool(1);
messageExecutorService.scheduleWithFixedDelay(new messageProcessor(), 0, 5, TimeUnit.SECONDS);
}
public void addSseConnection(String id, EventOutput eventOutput) {
logger.info("adding connection for id={}.", id);
connectionMap.put(id, eventOutput);
}
private class messageProcessor implements Runnable {
@Override
public void run() {
try {
Iterator<Map.Entry<String, EventOutput>> iterator = connectionMap.entrySet().iterator();
while (iterator.hasNext()) {
boolean remove = false;
Map.Entry<String, EventOutput> entry = iterator.next();
EventOutput eventOutput = entry.getValue();
if (eventOutput != null) {
if (eventOutput.isClosed()) {
remove = true;
} else {
try {
logger.info("writing to id={}.", entry.getKey());
eventOutput.write(new OutboundEvent.Builder().name("custom-message").data(String.class, "EOM").build());
} catch (Exception ex) {
logger.info(String.format("write failed to id=%s.", entry.getKey()), ex);
remove = true;
}
}
}
if (remove) {
// we are removing the eventOutput. close it is if it not already closed.
if (!eventOutput.isClosed()) {
try {
eventOutput.close();
} catch (Exception ex) {
// do nothing.
}
}
iterator.remove();
}
}
} catch (Exception ex) {
logger.error("messageProcessor.run threw exception.", ex);
}
}
}
public void shutdown() {
if (messageExecutorService != null && !messageExecutorService.isShutdown()) {
logger.info("SseWriteManager.shutdown: calling messageExecutorService.shutdown.");
messageExecutorService.shutdown();
} else {
logger.info("SseWriteManager.shutdown: messageExecutorService == null || messageExecutorService.isShutdown().");
}
}}