‘liusuyi’
2023-12-28 8e384e65b3fc9b0c9116cf2ec2c11e3ad3bc361e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package com.ard.utils.netty.tcp;
 
/**
 * @Description:
 * @ClassName: init
 * @Author: 刘苏义
 * @Date: 2023年07月05日13:11
 * @Version: 1.0
 **/
 
import com.ard.alarm.radar.domain.ArdEquipRadar;
import com.ard.alarm.radar.service.IArdEquipRadarService;
import com.ard.utils.netty.config.NettyTcpConfiguration;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
 
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.Consumer;
 
/**
 * 客户端初始化
 *
 * @author lijiamin
 */
@Component
@Slf4j(topic = "netty")
@Order(2)
public class ClientInitialize implements ApplicationRunner {
    @Resource
    NettyTcpConfiguration nettyTcpConfig;
    @Resource
    IArdEquipRadarService ardEquipRadarService;
 
    private Bootstrap bootstrap;
    public static CopyOnWriteArraySet<ArdEquipRadar> falseConnectSet = new CopyOnWriteArraySet();
    public static ConcurrentHashMap<String, ArdEquipRadar> tureConnectMap = new ConcurrentHashMap();
    public static ConcurrentHashMap<String, Object> SuccessConnectMap = new ConcurrentHashMap();
    /**
     * Netty初始化配置
     */
    public void initNettyTcp() {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) {
                        socketChannel.pipeline().addLast(new ClientHandler());
                    }
                });
 
        // 异步持续监听连接失败的地址
        CompletableFuture.runAsync(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        if (falseConnectSet.size() != 0) {
                            // 循环集合内元素
                            falseConnectSet.forEach(new Consumer<ArdEquipRadar>() {
                                @Override
                                public void accept(ArdEquipRadar ardEquipRadar) {
                                    connectServer(ardEquipRadar);
                                }
                            });
                        }
                        Thread.sleep(10000);
                    } catch (Exception e) {
                        log.error("Netty初始化配置监听地址出现异常");
                        e.printStackTrace();
                    }
                }
            }
        });
    }
 
    /**
     * 服务连接
     *
     * @param ardEquipRadar
     */
    public void connectServer(ArdEquipRadar ardEquipRadar) {
        // 获取地址及端口
        String host = ardEquipRadar.getIp();
        Integer port = ardEquipRadar.getPort();
        // 异步连接tcp服务端
        bootstrap.remoteAddress(host, port).connect().addListener((ChannelFuture futureListener) -> {
            if (!futureListener.isSuccess()) {
                log.debug("雷达【" + host + ":" + port + "】连接失败");
                futureListener.channel().close();
                // 连接失败信息插入Set
                falseConnectSet.add(ardEquipRadar);
                // 连接失败信息从map移除
                tureConnectMap.remove( host + ":" + port);
                SuccessConnectMap.remove(ardEquipRadar.getId());
            } else {
                log.debug("雷达【" + host + ":" + port + "】连接成功");
                // 连接成功信息从Set拔除
                falseConnectSet.remove(ardEquipRadar);
                // 连接成功信息写入map
                tureConnectMap.put(host+":"+port, ardEquipRadar);
                SuccessConnectMap.put(ardEquipRadar.getId(),futureListener.channel());
            }
        });
    }
 
    /**
     * 初始化方法
     */
    @Override
    public void run(ApplicationArguments args) {
        if (!nettyTcpConfig.getEnabled()) {
            return;
        }
        initNettyTcp();//初始化nettyTcp
        List<ArdEquipRadar> ardEquipRadars = ardEquipRadarService.selectArdEquipRadarList(new ArdEquipRadar());
        for (ArdEquipRadar ardEquipRadar : ardEquipRadars) {
            String host = ardEquipRadar.getIp();
            Integer port = Integer.valueOf(ardEquipRadar.getPort());
            log.debug("TCP client try to connect radar【" + host + ":" + port+"】");
            connectServer(ardEquipRadar);//连接每一个雷达服务
        }
    }
}