wangmengmeng
2025-04-26 96250617dbbefce55b5966c94880e2b07b6c98df
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package com.dji.sample.component.websocketWmm;
 
import com.dji.sample.component.websocket.model.BizCodeEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
 
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
 
@Slf4j
@Component
@ServerEndpoint(value = "/ws")
public class WebSocketServerPlayBack {
 
 
 
    /** 记录链接在线数量 **/
    private static final AtomicInteger onlineCount = new AtomicInteger(0);
 
 
    /** 存放每个客户端对应的 WebSocketServer 对象 **/
    private static CopyOnWriteArraySet<WebSocketServerPlayBack> webSocketSet = new CopyOnWriteArraySet<>();
 
 
    /** 与某个客户端的连接会话,需要通过它来给客户端发送数据 **/
    private Session session;
 
 
    /** 心跳报文 **/
    private static final String HEARTBEAT_PACKETS = "The heartbeat packets";
 
 
    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        // 加入set中
        webSocketSet.add(this);
        // 在线数加1
        onlineCount.getAndIncrement();
 
    }
 
 
    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        // 从set中删除
        webSocketSet.remove(this);
        // 在线数减1
        onlineCount.getAndDecrement();
    }
 
    /**
     * 发生错误时调用
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("[历史数据回放] - WS 异常断开", session, error);
    }
 
 
    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息*/
    @OnMessage
    public void onMessage(String message, Session session) {
        if (HEARTBEAT_PACKETS.equals(message)) {
            log.debug("[消息订阅] - 心跳.");
            return;
        }
        // TODO 接收前端入参后的业务处理
 
    }
 
    /**
     * 群发自定义消息
     */
    public static void sendInfo(BizCodeEnum bizCode, String data) {
        for (WebSocketServerPlayBack item : webSocketSet) {
            try {
//                String message="WebSocketMessageResponse{" +
                  String message="{" +
                        "\"bizCode\":\"" + bizCode +
                        "\", \"timestamp\":" + System.currentTimeMillis() +
                        ", \"data\":" + data +
                        '}';
                item.sendMessage(message);
            } catch (IOException e) {
                log.error("[NVR 数据对接] - 数据推送异常, 数据: [{}].", bizCode, e);
                continue;
            }
        }
    }
 
    /**
     * 群发自定义消息
     */
    public static void sendInfo(String message) {
        for (WebSocketServerPlayBack item : webSocketSet) {
            try {
 
                item.sendMessage(message);
            } catch (IOException e) {
                log.error("[NVR 数据对接] - 数据推送异常, 数据: [{}].", message, e);
                continue;
            }
        }
    }
 
 
    /**
     * 指定会话推送
     * @param message
     */
    public static void sendInfo(Session session, String message) {
        for (WebSocketServerPlayBack item : webSocketSet) {
            try {
                if (null != session && item.session.equals(session)) {
                    item.sendMessage(message);
                }
 
            } catch (IOException e) {
                log.error("[数据对接] - 数据推送异常, 数据: [{}].", message, e);
                continue;
            }
        }
    }
 
    /**
     * 向链接推送消息
     * @param message
     * @throws IOException
     */
    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }
 
}