‘liusuyi’
2024-03-12 8e0b2b23294ac1fdce43b9076b93e29e9aa51530
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package com.ard.utils.netty.tcp;
 
import com.ard.alarm.radar.domain.ArdEquipRadar;
import com.ard.alarm.radar.service.IArdEquipRadarService;
import com.ard.utils.netty.config.NettyTcpConfiguration;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
 
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.Consumer;
 
/**
 * 客户端初始化
 *
 * @author lijiamin
 */
@Component
@Slf4j(topic = "netty")
@Order(2)
public class ClientInitialize implements ApplicationRunner {
    @Resource
    NettyTcpConfiguration nettyTcpConfig;
    @Resource
    IArdEquipRadarService ardEquipRadarService;
 
    private Bootstrap bootstrap;
    public static CopyOnWriteArraySet<ArdEquipRadar> falseConnectSet = new CopyOnWriteArraySet();//失败连接的雷达Set
    public static ConcurrentHashMap<String, ArdEquipRadar> trueConnectMap = new ConcurrentHashMap();//成功连接的ip端口对应的雷达
    public static ConcurrentHashMap<String, MessageHandler> SucMessageHandlerMap = new ConcurrentHashMap();//成功连接的ip端口对应的报文解析器
    public static ConcurrentHashMap<String, Channel> SucChannelMap = new ConcurrentHashMap();//成功连接的ip端口对应的netty通道
 
    /**
     * Netty初始化配置
     */
    public void initNettyTcp() {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) {
                        socketChannel.pipeline().addLast(new ClientHandler());
                    }
                });
 
        //异步持续监听连接失败的地址
        CompletableFuture.runAsync(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        if (falseConnectSet.size() != 0) {
                            // 循环集合内元素
                            falseConnectSet.forEach(new Consumer<ArdEquipRadar>() {
                                @Override
                                public void accept(ArdEquipRadar radar) {
                                    connectServer(radar);
                                }
                            });
                        }
                        Thread.sleep(nettyTcpConfig.getReConnectInterval());
                    } catch (Exception e) {
                        log.error("Netty初始化配置监听地址出现异常");
                        e.printStackTrace();
                    }
                }
            }
        });
    }
 
    /**
     * 服务连接
     *
     * @param ardEquipRadar
     */
    public void connectServer(ArdEquipRadar ardEquipRadar) {
        // 获取地址及端口
        String host = ardEquipRadar.getIp();
        Integer port = ardEquipRadar.getPort();
        String ipPort = host + ":" + port;
        // 异步连接tcp服务端
        bootstrap.remoteAddress(host, port).connect().addListener((ChannelFuture futureListener) -> {
            if (futureListener.isSuccess()) {
                log.debug("雷达【" + ipPort + "】连接成功");
                // 连接成功信息从Set拔除
                falseConnectSet.remove(ardEquipRadar);
                // 连接成功信息写入map
                trueConnectMap.put(ipPort, ardEquipRadar);
                SucMessageHandlerMap.put(ipPort, new MessageHandler());
                SucChannelMap.put(ipPort, futureListener.channel());
            } else {
                log.debug("雷达【" + ipPort + "】连接失败");
                futureListener.channel().close();
                // 连接失败信息插入Set
                falseConnectSet.add(ardEquipRadar);
 
            }
        });
    }
 
    /**
     * 初始化方法
     */
    @Override
    public void run(ApplicationArguments args) {
        if (!nettyTcpConfig.getEnabled()) {
            return;
        }
        initNettyTcp();//初始化nettyTcp
        List<ArdEquipRadar> ardEquipRadars = ardEquipRadarService.selectArdEquipRadarList(new ArdEquipRadar());
        for (ArdEquipRadar ardEquipRadar : ardEquipRadars) {
            String host = ardEquipRadar.getIp();
            Integer port = Integer.valueOf(ardEquipRadar.getPort());
            log.debug("TCP client try to connect radar【" + host + ":" + port + "】");
            connectServer(ardEquipRadar);//连接每一个雷达服务
        }
    }
 
    /**
     * 监测雷达连接状态
     */
    @Scheduled(cron = "0/3 * * * * ?")
    public void monitorConnectStatus() {
        log.debug("定时监测雷达连接状态");
        List<ArdEquipRadar> ardEquipRadars = ardEquipRadarService.selectArdEquipRadarList(new ArdEquipRadar());
        ardEquipRadars.stream().forEach(ardEquipRadar -> {
 
            boolean online = trueConnectMap.containsKey(ardEquipRadar.getIp() + ":" + ardEquipRadar.getPort());
            if (online) {
                if (ardEquipRadar.getState() == null || !ardEquipRadar.getState().equals("1")) {
                    ardEquipRadar.setState("1");
                    ardEquipRadarService.updateArdEquipRadar(ardEquipRadar);
                }
            } else {
                if (ardEquipRadar.getState() == null || !ardEquipRadar.getState().equals("0")) {
                    ardEquipRadar.setState("0");
                    ardEquipRadarService.updateArdEquipRadar(ardEquipRadar);
                }
            }
        });
    }
}