‘liusuyi’
2024-02-01 b587ba125adcee0631bc816540779cca56f9099b
src/main/java/com/ard/utils/netty/tcp/ClientInitialize.java
@@ -1,141 +1,159 @@
package com.ard.utils.netty.tcp;
/**
 * @Description:
 * @ClassName: init
 * @Author: 刘苏义
 * @Date: 2023年07月05日13:11
 * @Version: 1.0
 **/
import com.ard.alarm.radar.domain.ArdEquipRadar;
import com.ard.alarm.radar.service.IArdEquipRadarService;
import com.ard.utils.netty.config.NettyTcpConfiguration;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.Consumer;
/**
 * 客户端初始化
 *
 * @author lijiamin
 */
@Component
@Slf4j(topic = "netty")
@Order(2)
public class ClientInitialize implements ApplicationRunner{
    @Resource
    NettyTcpConfiguration nettyTcpConfig;
    @Resource
    IArdEquipRadarService ardEquipRadarService;
    private Bootstrap bootstrap;
    public static CopyOnWriteArraySet<ArdEquipRadar> falseConnectSet = new CopyOnWriteArraySet();
    public static ConcurrentHashMap<String, ArdEquipRadar> tureConnectMap = new ConcurrentHashMap();
    public static ConcurrentHashMap<String, Object> SuccessConnectMap = new ConcurrentHashMap();
    public static ConcurrentHashMap<String, MessageParsing> MessageMap = new ConcurrentHashMap();
    /**
     * Netty初始化配置
     */
    public void initNettyTcp() {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) {
                        socketChannel.pipeline().addLast(new ClientHandler());
                    }
                });
        // 异步持续监听连接失败的地址
        CompletableFuture.runAsync(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        if (falseConnectSet.size() != 0) {
                            // 循环集合内元素
                            falseConnectSet.forEach(new Consumer<ArdEquipRadar>() {
                                @Override
                                public void accept(ArdEquipRadar ardEquipRadar) {
                                    connectServer(ardEquipRadar);
                                }
                            });
                        }
                        Thread.sleep(10000);
                    } catch (Exception e) {
                        log.error("Netty初始化配置监听地址出现异常");
                        e.printStackTrace();
                    }
                }
            }
        });
    }
    /**
     * 服务连接
     *
     * @param ardEquipRadar
     */
    public void connectServer(ArdEquipRadar ardEquipRadar) {
        // 获取地址及端口
        String host = ardEquipRadar.getIp();
        Integer port = ardEquipRadar.getPort();
        // 异步连接tcp服务端
        bootstrap.remoteAddress(host, port).connect().addListener((ChannelFuture futureListener) -> {
            if (!futureListener.isSuccess()) {
                log.debug("雷达【" + host + ":" + port + "】连接失败");
                futureListener.channel().close();
                // 连接失败信息插入Set
                falseConnectSet.add(ardEquipRadar);
                // 连接失败信息从map移除
                tureConnectMap.remove( host + ":" + port);
                SuccessConnectMap.remove(ardEquipRadar.getId());
            } else {
                log.debug("雷达【" + host + ":" + port + "】连接成功");
                // 连接成功信息从Set拔除
                falseConnectSet.remove(ardEquipRadar);
                // 连接成功信息写入map
                tureConnectMap.put(host+":"+port, ardEquipRadar);
                MessageMap.put(host+":"+port,new MessageParsing());
                SuccessConnectMap.put(ardEquipRadar.getId(),futureListener.channel());
            }
        });
    }
    /**
     * 初始化方法
     */
    @Override
    public void run(ApplicationArguments args) {
        if (!nettyTcpConfig.getEnabled()) {
            return;
        }
        initNettyTcp();//初始化nettyTcp
        List<ArdEquipRadar> ardEquipRadars = ardEquipRadarService.selectArdEquipRadarList(new ArdEquipRadar());
        for (ArdEquipRadar ardEquipRadar : ardEquipRadars) {
            String host = ardEquipRadar.getIp();
            Integer port = Integer.valueOf(ardEquipRadar.getPort());
            log.debug("TCP client try to connect radar【" + host + ":" + port+"】");
            connectServer(ardEquipRadar);//连接每一个雷达服务
        }
    }
}
package com.ard.utils.netty.tcp;
import com.ard.alarm.radar.domain.ArdEquipRadar;
import com.ard.alarm.radar.service.IArdEquipRadarService;
import com.ard.utils.netty.config.NettyTcpConfiguration;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.Consumer;
/**
 * 客户端初始化
 *
 * @author lijiamin
 */
@Component
@Slf4j(topic = "netty")
@Order(2)
public class ClientInitialize implements ApplicationRunner {
    @Resource
    NettyTcpConfiguration nettyTcpConfig;
    @Resource
    IArdEquipRadarService ardEquipRadarService;
    private Bootstrap bootstrap;
    public static CopyOnWriteArraySet<ArdEquipRadar> falseConnectSet = new CopyOnWriteArraySet();//失败连接的雷达Set
    public static ConcurrentHashMap<String, ArdEquipRadar> trueConnectMap = new ConcurrentHashMap();//成功连接的ip端口对应的雷达
    public static ConcurrentHashMap<String, MessageHandler> SucMessageHandlerMap = new ConcurrentHashMap();//成功连接的ip端口对应的报文解析器
    public static ConcurrentHashMap<String, Channel> SucChannelMap = new ConcurrentHashMap();//成功连接的ip端口对应的netty通道
    /**
     * Netty初始化配置
     */
    public void initNettyTcp() {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) {
                        socketChannel.pipeline().addLast(new ClientHandler());
                    }
                });
        //异步持续监听连接失败的地址
        CompletableFuture.runAsync(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        if (falseConnectSet.size() != 0) {
                            // 循环集合内元素
                            falseConnectSet.forEach(new Consumer<ArdEquipRadar>() {
                                @Override
                                public void accept(ArdEquipRadar radar) {
                                    connectServer(radar);
                                }
                            });
                        }
                        Thread.sleep(10000);
                    } catch (Exception e) {
                        log.error("Netty初始化配置监听地址出现异常");
                        e.printStackTrace();
                    }
                }
            }
        });
    }
    /**
     * 服务连接
     *
     * @param ardEquipRadar
     */
    public void connectServer(ArdEquipRadar ardEquipRadar) {
        // 获取地址及端口
        String host = ardEquipRadar.getIp();
        Integer port = ardEquipRadar.getPort();
        String ipPort = host + ":" + port;
        // 异步连接tcp服务端
        bootstrap.remoteAddress(host, port).connect().addListener((ChannelFuture futureListener) -> {
            if (futureListener.isSuccess()) {
                log.debug("雷达【" + ipPort + "】连接成功");
                // 连接成功信息从Set拔除
                falseConnectSet.remove(ardEquipRadar);
                // 连接成功信息写入map
                trueConnectMap.put(ipPort, ardEquipRadar);
                SucMessageHandlerMap.put(ipPort, new MessageHandler());
                SucChannelMap.put(ipPort, futureListener.channel());
            } else {
                log.debug("雷达【" + ipPort + "】连接失败");
                futureListener.channel().close();
                // 连接失败信息插入Set
                falseConnectSet.add(ardEquipRadar);
            }
        });
    }
    /**
     * 初始化方法
     */
    @Override
    public void run(ApplicationArguments args) {
        if (!nettyTcpConfig.getEnabled()) {
            return;
        }
        initNettyTcp();//初始化nettyTcp
        List<ArdEquipRadar> ardEquipRadars = ardEquipRadarService.selectArdEquipRadarList(new ArdEquipRadar());
        for (ArdEquipRadar ardEquipRadar : ardEquipRadars) {
            String host = ardEquipRadar.getIp();
            Integer port = Integer.valueOf(ardEquipRadar.getPort());
            log.debug("TCP client try to connect radar【" + host + ":" + port + "】");
            connectServer(ardEquipRadar);//连接每一个雷达服务
        }
    }
    /**
     * 监测雷达连接状态
     */
    @Scheduled(cron = "0/5 * * * * ?")
    public void monitorConnectStatus() {
        log.debug("定时监测雷达连接状态");
        List<ArdEquipRadar> ardEquipRadars = ardEquipRadarService.selectArdEquipRadarList(new ArdEquipRadar());
        ardEquipRadars.stream().forEach(ardEquipRadar ->{
            if(trueConnectMap.contains(ardEquipRadar.getIp()+"_"+ardEquipRadar.getPort()))
            {
                if(!ardEquipRadar.getState().equals("1")) {
                    ardEquipRadar.setState("1");
                    ardEquipRadarService.updateArdEquipRadar(ardEquipRadar);
                }
            }
            else
            {
                if(!ardEquipRadar.getState().equals("0")) {
                    ardEquipRadar.setState("0");
                    ardEquipRadarService.updateArdEquipRadar(ardEquipRadar);
                }
            }
        });
    }
}