package org.yzh.commons.spring; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.http.codec.ServerSentEvent; import org.springframework.stereotype.Service; import org.yzh.commons.util.Exceptions; import org.yzh.commons.util.StrUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import java.time.Duration; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @Slf4j @Service @ConditionalOnClass(name = "reactor.core.publisher.Flux") public class SSEService implements ApplicationRunner { public final ConcurrentHashMap>> EVENTS = new ConcurrentHashMap<>(); public final ConcurrentHashMap>> USERS = new ConcurrentHashMap<>(); public void send(String event, Object message) { Map> emitters = EVENTS.get(event); if (emitters != null) { ServerSentEvent sse = ServerSentEvent.builder().event(event).data(message).build(); for (FluxSink emitter : emitters.values()) { emitter.next(sse); } } } public void send(String user, String event, Object message) { Map> emitters = USERS.get(user); if (emitters != null) { ServerSentEvent sse = ServerSentEvent.builder().event(event).data(message).build(); for (FluxSink emitter : emitters.values()) { emitter.next(sse); break; } } } public void broadcast(String event, Object message) { ServerSentEvent sse = ServerSentEvent.builder().event(event).data(message).build(); for (Map> user : USERS.values()) { for (FluxSink emitter : user.values()) { emitter.next(sse); break; } } } public void delEvent(String user, String event) { if (user == null || event == null) return; Map> events = USERS.get(user); if (events != null) { events.remove(event); if (events.isEmpty()) USERS.remove(user); } Map> users = EVENTS.get(event); if (users != null) { users.remove(user); if (users.isEmpty()) EVENTS.remove(event); } } public boolean addEvent(String user, String event) { if (user == null || event == null) return false; Map> result = USERS.computeIfPresent(user, (s, events) -> { for (FluxSink emitter : events.values()) { events.put(event, emitter); EVENTS.computeIfAbsent(event, k -> new ConcurrentHashMap<>()).put(user, emitter); break; } return events; }); return result != null; } public Flux connect() { String userId = StrUtils.simpleUUID(); ConcurrentHashMap> events = new ConcurrentHashMap<>(); if (USERS.putIfAbsent(userId, events) == null) { return Flux.create(emitter -> { emitter.onDispose(() -> events.keySet().forEach(t -> delEvent(userId, t))); events.put("0", emitter); EVENTS.computeIfAbsent("0", k -> new ConcurrentHashMap<>()).put(userId, emitter); emitter.next(ServerSentEvent.builder().event("start").data(userId).build()); }).doFinally(s -> Exceptions.sneaky(() -> events.keySet().forEach(t -> delEvent(userId, t)))) .timeout(Duration.ofSeconds(60)); } return Flux.just("repeated"); } @Override public void run(ApplicationArguments args) { Thread thread = new Thread(() -> { for (; ; ) { try { Thread.sleep(Duration.ofSeconds(30).toMillis()); broadcast("heartbeat", null); } catch (Throwable ignored) { } } }); thread.setName(Thread.currentThread().getName() + "-SSEService"); thread.setPriority(Thread.MIN_PRIORITY); thread.setDaemon(true); thread.start(); } }