package com.ard.utils.netty.tcp;
|
|
import com.ard.alarm.radar.domain.ArdEquipRadar;
|
import com.ard.alarm.radar.service.IArdEquipRadarService;
|
import com.ard.utils.netty.tcp.handler.*;
|
import io.netty.bootstrap.Bootstrap;
|
import io.netty.channel.*;
|
import io.netty.channel.nio.NioEventLoopGroup;
|
import io.netty.channel.socket.SocketChannel;
|
import io.netty.channel.socket.nio.NioSocketChannel;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Qualifier;
|
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
import org.springframework.stereotype.Component;
|
|
import javax.annotation.Resource;
|
import java.util.*;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.TimeUnit;
|
|
/**
|
* 客户端初始化
|
*
|
* @author lijiamin
|
*/
|
@Component
|
@Slf4j(topic = "netty")
|
public class RadarNettyTcpClient {
|
|
@Resource
|
IArdEquipRadarService ardEquipRadarService;
|
|
@Autowired
|
@Qualifier("exec")
|
ThreadPoolTaskExecutor executorService;
|
|
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
|
|
public static ConcurrentHashMap<String, MessageHandler> SucMessageHandlerMap = new ConcurrentHashMap();//成功连接的ip端口对应的报文解析器
|
public static ConcurrentHashMap<String, Integer> aroundScanfMap = new ConcurrentHashMap();//是否启动周视
|
public static final Map<String, Channel> SERVER_MAP = new ConcurrentHashMap();
|
public static final Map<String, ArdEquipRadar> RADAR_MAP = new ConcurrentHashMap();
|
|
/**
|
* 初始化方法
|
*/
|
public void initAllChannel(List<Map<String, Object>> portHosts) {
|
for (Map<String, Object> map : portHosts) {
|
String host = (String) map.get("IP");
|
Integer port = (Integer) map.get("PORT");
|
try {
|
connectServer(host, port);
|
} catch (Exception ex) {
|
log.error(ex.getMessage());
|
}
|
}
|
}
|
|
/**
|
* 初始化Bootstrap
|
* 刘苏义
|
* 2024/3/25 9:53:57
|
*/
|
public Bootstrap getBootstrap(EventLoopGroup group) {
|
if (null == group) {
|
group = eventLoopGroup;
|
}
|
Bootstrap bootstrap = new Bootstrap();
|
bootstrap.group(group)
|
.channel(NioSocketChannel.class)
|
.option(ChannelOption.TCP_NODELAY, true)
|
.option(ChannelOption.SO_KEEPALIVE, true)
|
.handler(new ChannelInitializer<SocketChannel>() {
|
@Override
|
protected void initChannel(SocketChannel socketChannel) throws Exception {
|
ChannelPipeline pipeline = socketChannel.pipeline();
|
pipeline.addLast("handler", new ClientHandler());
|
}
|
});
|
return bootstrap;
|
}
|
|
/**
|
* 服务连接
|
* 刘苏义
|
* 2024/3/25 9:52:28
|
*/
|
public void connectServer(String host, Integer port) {
|
try {
|
//异步连接tcp服务端
|
Bootstrap bootstrap = getBootstrap(null);
|
bootstrap.remoteAddress(host, port);
|
ChannelFuture channelFuture = bootstrap.connect().sync();
|
if (channelFuture.isSuccess()) {
|
log.info("雷达【" + host + ":" + port + "】连接成功");
|
String serverKey = String.format("%s:%d", host, port);
|
SERVER_MAP.put(serverKey, channelFuture.channel());
|
SucMessageHandlerMap.put(serverKey, new MessageHandler());
|
}
|
channelFuture.channel().closeFuture().addListener(new ChannelFutureListener() {
|
public void operationComplete(ChannelFuture future) throws Exception {
|
String serverKey = String.format("%s:%d", host, port);
|
SERVER_MAP.remove(serverKey);
|
//服务端离线,触发重连操作
|
reconnect(host, port);
|
}
|
});
|
} catch (Exception e) {
|
reconnect(host, port);
|
throw new RuntimeException("雷达【" + host + ":" + port + "】连接异常:" + e.getMessage());
|
}
|
}
|
|
/**
|
* 重连服务端
|
* 刘苏义
|
* 2024/3/25 9:52:15
|
*/
|
public void reconnect(final String host, final int port) {
|
log.debug("雷达【" + host + ":" + port + "】连接失败,进行重连...");
|
executorService.submit(new Runnable() {
|
public void run() {
|
for (; ; ) {
|
try {
|
TimeUnit.SECONDS.sleep(3);
|
connectServer(host, port);
|
if (SERVER_MAP.containsKey(String.format("%s:%d", host, port))) {
|
break;
|
}
|
} catch (Exception e) {
|
log.error(e.getMessage());
|
break;
|
}
|
}
|
}
|
});
|
}
|
|
/**
|
* 监测雷达连接状态
|
*/
|
@Scheduled(cron = "0/3 * * * * ?")
|
public void monitorConnectStatus() {
|
try {
|
log.debug("实时监测雷达连接状态...");
|
List<ArdEquipRadar> ardEquipRadars = ardEquipRadarService.selectArdEquipRadarList(new ArdEquipRadar());
|
ardEquipRadars.stream().forEach(ardEquipRadar -> {
|
//判断与雷达客户端连接状态
|
boolean online = SERVER_MAP.containsKey(ardEquipRadar.getIp() + ":" + ardEquipRadar.getPort());
|
if (online) {
|
//连接雷达客户端成功 判断周扫状态
|
if (aroundScanfMap.containsKey(ardEquipRadar.getIp() + ":" + ardEquipRadar.getPort())) {
|
Integer state = aroundScanfMap.get(ardEquipRadar.getIp() + ":" + ardEquipRadar.getPort());
|
if (state == 1) {//1-周扫打开-连接成功
|
if (ardEquipRadar.getState() == null || !ardEquipRadar.getState().equals("1")) {
|
ardEquipRadar.setState("1");
|
ardEquipRadarService.updateArdEquipRadar(ardEquipRadar);
|
}
|
} else {
|
if (ardEquipRadar.getState() == null || !ardEquipRadar.getState().equals("2")) {
|
//2-周扫未开
|
ardEquipRadar.setState("2");
|
ardEquipRadarService.updateArdEquipRadar(ardEquipRadar);
|
}
|
|
}
|
} else {
|
if (ardEquipRadar.getState() == null || !ardEquipRadar.getState().equals("2")) {
|
//2-周扫未开
|
ardEquipRadar.setState("2");
|
ardEquipRadarService.updateArdEquipRadar(ardEquipRadar);
|
}
|
}
|
} else {
|
//连接雷达客户端失败
|
if (ardEquipRadar.getState() == null || !ardEquipRadar.getState().equals("0")) {
|
//0-客户端不通
|
ardEquipRadar.setState("0");
|
ardEquipRadarService.updateArdEquipRadar(ardEquipRadar);
|
}
|
}
|
});
|
} catch (Exception e) {
|
log.error("定时监测雷达连接状态异常:" + e.getMessage());
|
}
|
}
|
}
|