package com.ard.utils.netty.tcp; /** * @Description: * @ClassName: init * @Author: 刘苏义 * @Date: 2023年07月05日13:11 * @Version: 1.0 **/ import com.ard.alarm.radar.domain.ArdEquipRadar; import com.ard.alarm.radar.service.IArdEquipRadarService; import com.ard.utils.netty.config.NettyTcpConfiguration; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.function.Consumer; /** * 客户端初始化 * * @author lijiamin */ @Component @Slf4j(topic = "netty") @Order(2) public class ClientInitialize implements ApplicationRunner { @Resource NettyTcpConfiguration nettyTcpConfig; @Resource IArdEquipRadarService ardEquipRadarService; private Bootstrap bootstrap; public static CopyOnWriteArraySet falseConnectSet = new CopyOnWriteArraySet(); public static ConcurrentHashMap tureConnectMap = new ConcurrentHashMap(); public static ConcurrentHashMap SuccessConnectMap = new ConcurrentHashMap(); /** * Netty初始化配置 */ public void initNettyTcp() { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) { socketChannel.pipeline().addLast(new ClientHandler()); } }); // 异步持续监听连接失败的地址 CompletableFuture.runAsync(new Runnable() { @Override public void run() { while (true) { try { if (falseConnectSet.size() != 0) { // 循环集合内元素 falseConnectSet.forEach(new Consumer() { @Override public void accept(ArdEquipRadar ardEquipRadar) { connectServer(ardEquipRadar); } }); } Thread.sleep(10000); } catch (Exception e) { log.error("Netty初始化配置监听地址出现异常"); e.printStackTrace(); } } } }); } /** * 服务连接 * * @param ardEquipRadar */ public void connectServer(ArdEquipRadar ardEquipRadar) { // 获取地址及端口 String host = ardEquipRadar.getIp(); Integer port = ardEquipRadar.getPort(); // 异步连接tcp服务端 bootstrap.remoteAddress(host, port).connect().addListener((ChannelFuture futureListener) -> { if (!futureListener.isSuccess()) { log.debug("雷达【" + host + ":" + port + "】连接失败"); futureListener.channel().close(); // 连接失败信息插入Set falseConnectSet.add(ardEquipRadar); // 连接失败信息从map移除 tureConnectMap.remove(futureListener.channel().id()); SuccessConnectMap.remove(ardEquipRadar.getId()); } else { log.debug("雷达【" + host + ":" + port + "】连接成功"); // 连接成功信息从Set拔除 falseConnectSet.remove(ardEquipRadar); // 连接成功信息写入map tureConnectMap.put(futureListener.channel().id(), ardEquipRadar); SuccessConnectMap.put(ardEquipRadar.getId(),futureListener.channel()); } }); } /** * 初始化方法 */ @Override public void run(ApplicationArguments args) { if (!nettyTcpConfig.getEnabled()) { return; } initNettyTcp();//初始化nettyTcp List ardEquipRadars = ardEquipRadarService.selectArdEquipRadarList(new ArdEquipRadar()); for (ArdEquipRadar ardEquipRadar : ardEquipRadars) { String host = ardEquipRadar.getIp(); Integer port = Integer.valueOf(ardEquipRadar.getPort()); log.debug("TCP client try to connect radar【" + host + ":" + port+"】"); connectServer(ardEquipRadar);//连接每一个雷达服务 } } }