package com.ard.utils.netty.tcp;
|
|
import com.ard.alarm.radar.domain.ArdEquipRadar;
|
import com.ard.alarm.radar.service.IArdEquipRadarService;
|
import com.ard.utils.netty.config.NettyTcpConfiguration;
|
import io.netty.bootstrap.Bootstrap;
|
import io.netty.channel.*;
|
import io.netty.channel.nio.NioEventLoopGroup;
|
import io.netty.channel.socket.SocketChannel;
|
import io.netty.channel.socket.nio.NioSocketChannel;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.boot.ApplicationArguments;
|
import org.springframework.boot.ApplicationRunner;
|
import org.springframework.core.annotation.Order;
|
import org.springframework.stereotype.Component;
|
|
import javax.annotation.Resource;
|
import java.util.*;
|
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.CopyOnWriteArraySet;
|
import java.util.function.Consumer;
|
|
/**
|
* 客户端初始化
|
*
|
* @author lijiamin
|
*/
|
@Component
|
@Slf4j(topic = "netty")
|
@Order(2)
|
public class ClientInitialize implements ApplicationRunner {
|
@Resource
|
NettyTcpConfiguration nettyTcpConfig;
|
@Resource
|
IArdEquipRadarService ardEquipRadarService;
|
|
private Bootstrap bootstrap;
|
public static CopyOnWriteArraySet<ArdEquipRadar> falseConnectSet = new CopyOnWriteArraySet();//失败连接的雷达Set
|
public static ConcurrentHashMap<String, ArdEquipRadar> trueConnectMap = new ConcurrentHashMap();//成功连接的ip端口对应的雷达
|
public static ConcurrentHashMap<String, MessageHandler> SucMessageHandlerMap = new ConcurrentHashMap();//成功连接的ip端口对应的报文解析器
|
public static ConcurrentHashMap<String, Channel> SucChannelMap = new ConcurrentHashMap();//成功连接的ip端口对应的netty通道
|
|
/**
|
* Netty初始化配置
|
*/
|
public void initNettyTcp() {
|
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
|
bootstrap = new Bootstrap();
|
bootstrap.group(eventLoopGroup)
|
.channel(NioSocketChannel.class)
|
.option(ChannelOption.TCP_NODELAY, true)
|
.option(ChannelOption.SO_KEEPALIVE, true)
|
.handler(new ChannelInitializer<SocketChannel>() {
|
@Override
|
protected void initChannel(SocketChannel socketChannel) {
|
socketChannel.pipeline().addLast(new ClientHandler());
|
}
|
});
|
|
//异步持续监听连接失败的地址
|
CompletableFuture.runAsync(new Runnable() {
|
@Override
|
public void run() {
|
while (true) {
|
try {
|
if (falseConnectSet.size() != 0) {
|
// 循环集合内元素
|
falseConnectSet.forEach(new Consumer<ArdEquipRadar>() {
|
@Override
|
public void accept(ArdEquipRadar radar) {
|
connectServer(radar);
|
}
|
});
|
}
|
Thread.sleep(10000);
|
} catch (Exception e) {
|
log.error("Netty初始化配置监听地址出现异常");
|
e.printStackTrace();
|
}
|
}
|
}
|
});
|
}
|
|
/**
|
* 服务连接
|
*
|
* @param ardEquipRadar
|
*/
|
public void connectServer(ArdEquipRadar ardEquipRadar) {
|
// 获取地址及端口
|
String host = ardEquipRadar.getIp();
|
Integer port = ardEquipRadar.getPort();
|
String ipPort = host + ":" + port;
|
// 异步连接tcp服务端
|
bootstrap.remoteAddress(host, port).connect().addListener((ChannelFuture futureListener) -> {
|
if (futureListener.isSuccess()) {
|
log.debug("雷达【" + ipPort + "】连接成功");
|
// 连接成功信息从Set拔除
|
falseConnectSet.remove(ardEquipRadar);
|
// 连接成功信息写入map
|
trueConnectMap.put(ipPort, ardEquipRadar);
|
SucMessageHandlerMap.put(ipPort, new MessageHandler());
|
SucChannelMap.put(ipPort, futureListener.channel());
|
} else {
|
log.debug("雷达【" + ipPort + "】连接失败");
|
futureListener.channel().close();
|
// 连接失败信息插入Set
|
falseConnectSet.add(ardEquipRadar);
|
|
}
|
});
|
}
|
|
/**
|
* 初始化方法
|
*/
|
@Override
|
public void run(ApplicationArguments args) {
|
if (!nettyTcpConfig.getEnabled()) {
|
return;
|
}
|
initNettyTcp();//初始化nettyTcp
|
List<ArdEquipRadar> ardEquipRadars = ardEquipRadarService.selectArdEquipRadarList(new ArdEquipRadar());
|
for (ArdEquipRadar ardEquipRadar : ardEquipRadars) {
|
String host = ardEquipRadar.getIp();
|
Integer port = Integer.valueOf(ardEquipRadar.getPort());
|
log.debug("TCP client try to connect radar【" + host + ":" + port + "】");
|
connectServer(ardEquipRadar);//连接每一个雷达服务
|
}
|
}
|
}
|