18045010223
2025-07-07 0d3a683a0c97154b1f2e6657398664537e4e3e82
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package org.yzh.commons.spring;
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.stereotype.Service;
import org.yzh.commons.util.Exceptions;
import org.yzh.commons.util.StrUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
 
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
 
@Slf4j
@Service
@ConditionalOnClass(name = "reactor.core.publisher.Flux")
public class SSEService implements ApplicationRunner {
 
    public final ConcurrentHashMap<String, Map<String, FluxSink<Object>>> EVENTS = new ConcurrentHashMap<>();
    public final ConcurrentHashMap<String, Map<String, FluxSink<Object>>> USERS = new ConcurrentHashMap<>();
 
    public void send(String event, Object message) {
        Map<String, FluxSink<Object>> emitters = EVENTS.get(event);
        if (emitters != null) {
            ServerSentEvent<Object> sse = ServerSentEvent.builder().event(event).data(message).build();
            for (FluxSink<Object> emitter : emitters.values()) {
                emitter.next(sse);
            }
        }
    }
 
    public void send(String user, String event, Object message) {
        Map<String, FluxSink<Object>> emitters = USERS.get(user);
        if (emitters != null) {
            ServerSentEvent<Object> sse = ServerSentEvent.builder().event(event).data(message).build();
            for (FluxSink<Object> emitter : emitters.values()) {
                emitter.next(sse);
                break;
            }
        }
    }
 
    public void broadcast(String event, Object message) {
        ServerSentEvent<Object> sse = ServerSentEvent.builder().event(event).data(message).build();
        for (Map<String, FluxSink<Object>> user : USERS.values()) {
            for (FluxSink<Object> emitter : user.values()) {
                emitter.next(sse);
                break;
            }
        }
    }
 
    public void delEvent(String user, String event) {
        if (user == null || event == null)
            return;
        Map<String, FluxSink<Object>> events = USERS.get(user);
        if (events != null) {
            events.remove(event);
            if (events.isEmpty())
                USERS.remove(user);
        }
        Map<String, FluxSink<Object>> users = EVENTS.get(event);
        if (users != null) {
            users.remove(user);
            if (users.isEmpty())
                EVENTS.remove(event);
        }
    }
 
    public boolean addEvent(String user, String event) {
        if (user == null || event == null)
            return false;
        Map<String, FluxSink<Object>> result = USERS.computeIfPresent(user, (s, events) -> {
            for (FluxSink<Object> emitter : events.values()) {
                events.put(event, emitter);
                EVENTS.computeIfAbsent(event, k -> new ConcurrentHashMap<>()).put(user, emitter);
                break;
            }
            return events;
        });
        return result != null;
    }
 
    public Flux<Object> connect() {
        String userId = StrUtils.simpleUUID();
        ConcurrentHashMap<String, FluxSink<Object>> events = new ConcurrentHashMap<>();
        if (USERS.putIfAbsent(userId, events) == null) {
            return Flux.create(emitter -> {
                        emitter.onDispose(() -> events.keySet().forEach(t -> delEvent(userId, t)));
                        events.put("0", emitter);
                        EVENTS.computeIfAbsent("0", k -> new ConcurrentHashMap<>()).put(userId, emitter);
                        emitter.next(ServerSentEvent.builder().event("start").data(userId).build());
                    }).doFinally(s -> Exceptions.sneaky(() -> events.keySet().forEach(t -> delEvent(userId, t))))
                    .timeout(Duration.ofSeconds(60));
        }
        return Flux.just("repeated");
    }
 
    @Override
    public void run(ApplicationArguments args) {
        Thread thread = new Thread(() -> {
            for (; ; ) {
                try {
                    Thread.sleep(Duration.ofSeconds(30).toMillis());
                    broadcast("heartbeat", null);
                } catch (Throwable ignored) {
                }
            }
        });
        thread.setName(Thread.currentThread().getName() + "-SSEService");
        thread.setPriority(Thread.MIN_PRIORITY);
        thread.setDaemon(true);
        thread.start();
    }
}