package com.ard.utils.netty.tcp; import com.ard.alarm.radar.domain.ArdEquipRadar; import com.ard.alarm.radar.service.IArdEquipRadarService; import com.ard.utils.netty.tcp.handler.*; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; /** * 客户端初始化 * * @author lijiamin */ @Component @Slf4j(topic = "netty") public class RadarNettyTcpClient { @Resource IArdEquipRadarService ardEquipRadarService; @Autowired @Qualifier("exec") ThreadPoolTaskExecutor executorService; EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); public static ConcurrentHashMap SucMessageHandlerMap = new ConcurrentHashMap();//成功连接的ip端口对应的报文解析器 public static ConcurrentHashMap aroundScanfMap = new ConcurrentHashMap();//是否启动周视 public static final Map SERVER_MAP = new ConcurrentHashMap(); public static final Map RADAR_MAP = new ConcurrentHashMap(); /** * 初始化方法 */ public void initAllChannel(List> portHosts) { for (Map map : portHosts) { String host = (String) map.get("IP"); Integer port = (Integer) map.get("PORT"); try { connectServer(host, port); } catch (Exception ex) { log.error(ex.getMessage()); } } } /** * 初始化Bootstrap * 刘苏义 * 2024/3/25 9:53:57 */ public Bootstrap getBootstrap(EventLoopGroup group) { if (null == group) { group = eventLoopGroup; } Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast("handler", new ClientHandler()); } }); return bootstrap; } /** * 服务连接 * 刘苏义 * 2024/3/25 9:52:28 */ public void connectServer(String host, Integer port) { try { //异步连接tcp服务端 Bootstrap bootstrap = getBootstrap(null); bootstrap.remoteAddress(host, port); ChannelFuture channelFuture = bootstrap.connect().sync(); if (channelFuture.isSuccess()) { log.info("雷达【" + host + ":" + port + "】连接成功"); String serverKey = String.format("%s:%d", host, port); SERVER_MAP.put(serverKey, channelFuture.channel()); SucMessageHandlerMap.put(serverKey, new MessageHandler()); } channelFuture.channel().closeFuture().addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { String serverKey = String.format("%s:%d", host, port); SERVER_MAP.remove(serverKey); //服务端离线,触发重连操作 reconnect(host, port); } }); } catch (Exception e) { reconnect(host, port); throw new RuntimeException("雷达【" + host + ":" + port + "】连接异常:" + e.getMessage()); } } /** * 重连服务端 * 刘苏义 * 2024/3/25 9:52:15 */ public void reconnect(final String host, final int port) { log.debug("雷达【" + host + ":" + port + "】连接失败,进行重连..."); executorService.submit(new Runnable() { public void run() { for (; ; ) { try { TimeUnit.SECONDS.sleep(3); connectServer(host, port); if (SERVER_MAP.containsKey(String.format("%s:%d", host, port))) { break; } } catch (Exception e) { log.error(e.getMessage()); break; } } } }); } /** * 监测雷达连接状态 */ @Scheduled(cron = "0/3 * * * * ?") public void monitorConnectStatus() { try { log.debug("实时监测雷达连接状态..."); List ardEquipRadars = ardEquipRadarService.selectArdEquipRadarList(new ArdEquipRadar()); ardEquipRadars.stream().forEach(ardEquipRadar -> { //判断与雷达客户端连接状态 boolean online = SERVER_MAP.containsKey(ardEquipRadar.getIp() + ":" + ardEquipRadar.getPort()); if (online) { //连接雷达客户端成功 判断周扫状态 if (aroundScanfMap.containsKey(ardEquipRadar.getIp() + ":" + ardEquipRadar.getPort())) { Integer state = aroundScanfMap.get(ardEquipRadar.getIp() + ":" + ardEquipRadar.getPort()); if (state == 1) {//1-周扫打开-连接成功 if (ardEquipRadar.getState() == null || !ardEquipRadar.getState().equals("1")) { ardEquipRadar.setState("1"); ardEquipRadarService.updateArdEquipRadar(ardEquipRadar); } } else { if (ardEquipRadar.getState() == null || !ardEquipRadar.getState().equals("2")) { //2-周扫未开 ardEquipRadar.setState("2"); ardEquipRadarService.updateArdEquipRadar(ardEquipRadar); } } } else { if (ardEquipRadar.getState() == null || !ardEquipRadar.getState().equals("2")) { //2-周扫未开 ardEquipRadar.setState("2"); ardEquipRadarService.updateArdEquipRadar(ardEquipRadar); } } } else { //连接雷达客户端失败 if (ardEquipRadar.getState() == null || !ardEquipRadar.getState().equals("0")) { //0-客户端不通 ardEquipRadar.setState("0"); ardEquipRadarService.updateArdEquipRadar(ardEquipRadar); } } }); } catch (Exception e) { log.error("定时监测雷达连接状态异常:" + e.getMessage()); } } }