package com.ard.utils.netty.tcp; import com.ard.alarm.radar.domain.ArdEquipRadar; import com.ard.alarm.radar.service.IArdEquipRadarService; import com.ard.utils.netty.config.NettyTcpConfiguration; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.annotation.Order; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.function.Consumer; /** * 客户端初始化 * * @author lijiamin */ @Component @Slf4j(topic = "netty") @Order(2) public class ClientInitialize implements ApplicationRunner { @Resource NettyTcpConfiguration nettyTcpConfig; @Resource IArdEquipRadarService ardEquipRadarService; private Bootstrap bootstrap; public static CopyOnWriteArraySet falseConnectSet = new CopyOnWriteArraySet();//失败连接的雷达Set public static ConcurrentHashMap trueConnectMap = new ConcurrentHashMap();//成功连接的ip端口对应的雷达 public static ConcurrentHashMap SucMessageHandlerMap = new ConcurrentHashMap();//成功连接的ip端口对应的报文解析器 public static ConcurrentHashMap SucChannelMap = new ConcurrentHashMap();//成功连接的ip端口对应的netty通道 /** * Netty初始化配置 */ public void initNettyTcp() { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) { socketChannel.pipeline().addLast(new ClientHandler()); } }); //异步持续监听连接失败的地址 CompletableFuture.runAsync(new Runnable() { @Override public void run() { while (true) { try { if (falseConnectSet.size() != 0) { // 循环集合内元素 falseConnectSet.forEach(new Consumer() { @Override public void accept(ArdEquipRadar radar) { connectServer(radar); } }); } Thread.sleep(10000); } catch (Exception e) { log.error("Netty初始化配置监听地址出现异常"); e.printStackTrace(); } } } }); } /** * 服务连接 * * @param ardEquipRadar */ public void connectServer(ArdEquipRadar ardEquipRadar) { // 获取地址及端口 String host = ardEquipRadar.getIp(); Integer port = ardEquipRadar.getPort(); String ipPort = host + ":" + port; // 异步连接tcp服务端 bootstrap.remoteAddress(host, port).connect().addListener((ChannelFuture futureListener) -> { if (futureListener.isSuccess()) { log.debug("雷达【" + ipPort + "】连接成功"); // 连接成功信息从Set拔除 falseConnectSet.remove(ardEquipRadar); // 连接成功信息写入map trueConnectMap.put(ipPort, ardEquipRadar); SucMessageHandlerMap.put(ipPort, new MessageHandler()); SucChannelMap.put(ipPort, futureListener.channel()); } else { log.debug("雷达【" + ipPort + "】连接失败"); futureListener.channel().close(); // 连接失败信息插入Set falseConnectSet.add(ardEquipRadar); } }); } /** * 初始化方法 */ @Override public void run(ApplicationArguments args) { if (!nettyTcpConfig.getEnabled()) { return; } initNettyTcp();//初始化nettyTcp List ardEquipRadars = ardEquipRadarService.selectArdEquipRadarList(new ArdEquipRadar()); for (ArdEquipRadar ardEquipRadar : ardEquipRadars) { String host = ardEquipRadar.getIp(); Integer port = Integer.valueOf(ardEquipRadar.getPort()); log.debug("TCP client try to connect radar【" + host + ":" + port + "】"); connectServer(ardEquipRadar);//连接每一个雷达服务 } } /** * 监测雷达连接状态 */ @Scheduled(cron = "0/5 * * * * ?") public void monitorConnectStatus() { log.debug("定时监测雷达连接状态"); List ardEquipRadars = ardEquipRadarService.selectArdEquipRadarList(new ArdEquipRadar()); ardEquipRadars.stream().forEach(ardEquipRadar -> { if (trueConnectMap.containsKey(ardEquipRadar.getIp() + ":" + ardEquipRadar.getPort())) { if (ardEquipRadar.getState() != null && !ardEquipRadar.getState().equals("1")) { ardEquipRadar.setState("1"); ardEquipRadarService.updateArdEquipRadar(ardEquipRadar); } } else { if (ardEquipRadar.getState() == null || (ardEquipRadar.getState() != null && !ardEquipRadar.getState().equals("0"))) { ardEquipRadar.setState("0"); ardEquipRadarService.updateArdEquipRadar(ardEquipRadar); } } }); } }