| | |
| | | package com.ard.utils.netty.tcp; |
| | | |
| | | /** |
| | | * @Description: |
| | | * @ClassName: init |
| | | * @Author: 刘苏义 |
| | | * @Date: 2023年07月05日13:11 |
| | | * @Version: 1.0 |
| | | **/ |
| | | |
| | | import com.ard.alarm.radar.domain.ArdEquipRadar; |
| | | import com.ard.alarm.radar.service.IArdEquipRadarService; |
| | | import com.ard.utils.netty.config.NettyTcpConfiguration; |
| | | import io.netty.bootstrap.Bootstrap; |
| | | import io.netty.channel.*; |
| | | import io.netty.channel.nio.NioEventLoopGroup; |
| | | import io.netty.channel.socket.SocketChannel; |
| | | import io.netty.channel.socket.nio.NioSocketChannel; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.boot.ApplicationArguments; |
| | | import org.springframework.boot.ApplicationRunner; |
| | | import org.springframework.core.annotation.Order; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import javax.annotation.Resource; |
| | | import java.util.*; |
| | | import java.util.concurrent.CompletableFuture; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.CopyOnWriteArraySet; |
| | | import java.util.function.Consumer; |
| | | |
| | | /** |
| | | * 客户端初始化 |
| | | * |
| | | * @author lijiamin |
| | | */ |
| | | @Component |
| | | @Slf4j(topic = "netty") |
| | | @Order(2) |
| | | public class ClientInitialize implements ApplicationRunner { |
| | | @Resource |
| | | NettyTcpConfiguration nettyTcpConfig; |
| | | @Resource |
| | | IArdEquipRadarService ardEquipRadarService; |
| | | |
| | | private Bootstrap bootstrap; |
| | | public static CopyOnWriteArraySet<ArdEquipRadar> falseConnectSet = new CopyOnWriteArraySet(); |
| | | public static ConcurrentHashMap<String, ArdEquipRadar> tureConnectMap = new ConcurrentHashMap(); |
| | | public static ConcurrentHashMap<String, Object> SuccessConnectMap = new ConcurrentHashMap(); |
| | | /** |
| | | * Netty初始化配置 |
| | | */ |
| | | public void initNettyTcp() { |
| | | EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); |
| | | bootstrap = new Bootstrap(); |
| | | bootstrap.group(eventLoopGroup) |
| | | .channel(NioSocketChannel.class) |
| | | .option(ChannelOption.TCP_NODELAY, true) |
| | | .option(ChannelOption.SO_KEEPALIVE, true) |
| | | .handler(new ChannelInitializer<SocketChannel>() { |
| | | @Override |
| | | protected void initChannel(SocketChannel socketChannel) { |
| | | socketChannel.pipeline().addLast(new ClientHandler()); |
| | | } |
| | | }); |
| | | |
| | | // 异步持续监听连接失败的地址 |
| | | CompletableFuture.runAsync(new Runnable() { |
| | | @Override |
| | | public void run() { |
| | | while (true) { |
| | | try { |
| | | if (falseConnectSet.size() != 0) { |
| | | // 循环集合内元素 |
| | | falseConnectSet.forEach(new Consumer<ArdEquipRadar>() { |
| | | @Override |
| | | public void accept(ArdEquipRadar ardEquipRadar) { |
| | | connectServer(ardEquipRadar); |
| | | } |
| | | }); |
| | | } |
| | | Thread.sleep(10000); |
| | | } catch (Exception e) { |
| | | log.error("Netty初始化配置监听地址出现异常"); |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | } |
| | | }); |
| | | } |
| | | |
| | | /** |
| | | * 服务连接 |
| | | * |
| | | * @param ardEquipRadar |
| | | */ |
| | | public void connectServer(ArdEquipRadar ardEquipRadar) { |
| | | // 获取地址及端口 |
| | | String host = ardEquipRadar.getIp(); |
| | | Integer port = ardEquipRadar.getPort(); |
| | | // 异步连接tcp服务端 |
| | | bootstrap.remoteAddress(host, port).connect().addListener((ChannelFuture futureListener) -> { |
| | | if (!futureListener.isSuccess()) { |
| | | log.debug("雷达【" + host + ":" + port + "】连接失败"); |
| | | futureListener.channel().close(); |
| | | // 连接失败信息插入Set |
| | | falseConnectSet.add(ardEquipRadar); |
| | | // 连接失败信息从map移除 |
| | | tureConnectMap.remove( host + ":" + port); |
| | | SuccessConnectMap.remove(ardEquipRadar.getId()); |
| | | } else { |
| | | log.debug("雷达【" + host + ":" + port + "】连接成功"); |
| | | // 连接成功信息从Set拔除 |
| | | falseConnectSet.remove(ardEquipRadar); |
| | | // 连接成功信息写入map |
| | | tureConnectMap.put(host+":"+port, ardEquipRadar); |
| | | SuccessConnectMap.put(ardEquipRadar.getId(),futureListener.channel()); |
| | | } |
| | | }); |
| | | } |
| | | |
| | | /** |
| | | * 初始化方法 |
| | | */ |
| | | @Override |
| | | public void run(ApplicationArguments args) { |
| | | if (!nettyTcpConfig.getEnabled()) { |
| | | return; |
| | | } |
| | | initNettyTcp();//初始化nettyTcp |
| | | List<ArdEquipRadar> ardEquipRadars = ardEquipRadarService.selectArdEquipRadarList(new ArdEquipRadar()); |
| | | for (ArdEquipRadar ardEquipRadar : ardEquipRadars) { |
| | | String host = ardEquipRadar.getIp(); |
| | | Integer port = Integer.valueOf(ardEquipRadar.getPort()); |
| | | log.debug("TCP client try to connect radar【" + host + ":" + port+"】"); |
| | | connectServer(ardEquipRadar);//连接每一个雷达服务 |
| | | } |
| | | } |
| | | } |
| | | package com.ard.utils.netty.tcp;
|
| | |
|
| | | import com.ard.alarm.radar.domain.ArdEquipRadar;
|
| | | import com.ard.alarm.radar.service.IArdEquipRadarService;
|
| | | import com.ard.utils.netty.config.NettyTcpConfiguration;
|
| | | import io.netty.bootstrap.Bootstrap;
|
| | | import io.netty.channel.*;
|
| | | import io.netty.channel.nio.NioEventLoopGroup;
|
| | | import io.netty.channel.socket.SocketChannel;
|
| | | import io.netty.channel.socket.nio.NioSocketChannel;
|
| | | import lombok.extern.slf4j.Slf4j;
|
| | | import org.springframework.boot.ApplicationArguments;
|
| | | import org.springframework.boot.ApplicationRunner;
|
| | | import org.springframework.core.annotation.Order;
|
| | | import org.springframework.scheduling.annotation.Scheduled;
|
| | | import org.springframework.stereotype.Component;
|
| | |
|
| | | import javax.annotation.Resource;
|
| | | import java.util.*;
|
| | | import java.util.concurrent.CompletableFuture;
|
| | | import java.util.concurrent.ConcurrentHashMap;
|
| | | import java.util.concurrent.CopyOnWriteArraySet;
|
| | | import java.util.function.Consumer;
|
| | |
|
| | | /**
|
| | | * 客户端初始化
|
| | | *
|
| | | * @author lijiamin
|
| | | */
|
| | | @Component
|
| | | @Slf4j(topic = "netty")
|
| | | @Order(2)
|
| | | public class ClientInitialize implements ApplicationRunner {
|
| | | @Resource
|
| | | NettyTcpConfiguration nettyTcpConfig;
|
| | | @Resource
|
| | | IArdEquipRadarService ardEquipRadarService;
|
| | |
|
| | | private Bootstrap bootstrap;
|
| | | public static CopyOnWriteArraySet<ArdEquipRadar> falseConnectSet = new CopyOnWriteArraySet();//失败连接的雷达Set
|
| | | public static ConcurrentHashMap<String, ArdEquipRadar> trueConnectMap = new ConcurrentHashMap();//成功连接的ip端口对应的雷达
|
| | | public static ConcurrentHashMap<String, MessageHandler> SucMessageHandlerMap = new ConcurrentHashMap();//成功连接的ip端口对应的报文解析器
|
| | | public static ConcurrentHashMap<String, Channel> SucChannelMap = new ConcurrentHashMap();//成功连接的ip端口对应的netty通道
|
| | |
|
| | | /**
|
| | | * Netty初始化配置
|
| | | */
|
| | | public void initNettyTcp() {
|
| | | EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
|
| | | bootstrap = new Bootstrap();
|
| | | bootstrap.group(eventLoopGroup)
|
| | | .channel(NioSocketChannel.class)
|
| | | .option(ChannelOption.TCP_NODELAY, true)
|
| | | .option(ChannelOption.SO_KEEPALIVE, true)
|
| | | .handler(new ChannelInitializer<SocketChannel>() {
|
| | | @Override
|
| | | protected void initChannel(SocketChannel socketChannel) {
|
| | | socketChannel.pipeline().addLast(new ClientHandler());
|
| | | }
|
| | | });
|
| | |
|
| | | //异步持续监听连接失败的地址
|
| | | CompletableFuture.runAsync(new Runnable() {
|
| | | @Override
|
| | | public void run() {
|
| | | while (true) {
|
| | | try {
|
| | | if (falseConnectSet.size() != 0) {
|
| | | // 循环集合内元素
|
| | | falseConnectSet.forEach(new Consumer<ArdEquipRadar>() {
|
| | | @Override
|
| | | public void accept(ArdEquipRadar radar) {
|
| | | connectServer(radar);
|
| | | }
|
| | | });
|
| | | }
|
| | | Thread.sleep(10000);
|
| | | } catch (Exception e) {
|
| | | log.error("Netty初始化配置监听地址出现异常");
|
| | | e.printStackTrace();
|
| | | }
|
| | | }
|
| | | }
|
| | | });
|
| | | }
|
| | |
|
| | | /**
|
| | | * 服务连接
|
| | | *
|
| | | * @param ardEquipRadar
|
| | | */
|
| | | public void connectServer(ArdEquipRadar ardEquipRadar) {
|
| | | // 获取地址及端口
|
| | | String host = ardEquipRadar.getIp();
|
| | | Integer port = ardEquipRadar.getPort();
|
| | | String ipPort = host + ":" + port;
|
| | | // 异步连接tcp服务端
|
| | | bootstrap.remoteAddress(host, port).connect().addListener((ChannelFuture futureListener) -> {
|
| | | if (futureListener.isSuccess()) {
|
| | | log.debug("雷达【" + ipPort + "】连接成功");
|
| | | // 连接成功信息从Set拔除
|
| | | falseConnectSet.remove(ardEquipRadar);
|
| | | // 连接成功信息写入map
|
| | | trueConnectMap.put(ipPort, ardEquipRadar);
|
| | | SucMessageHandlerMap.put(ipPort, new MessageHandler());
|
| | | SucChannelMap.put(ipPort, futureListener.channel());
|
| | | } else {
|
| | | log.debug("雷达【" + ipPort + "】连接失败");
|
| | | futureListener.channel().close();
|
| | | // 连接失败信息插入Set
|
| | | falseConnectSet.add(ardEquipRadar);
|
| | |
|
| | | }
|
| | | });
|
| | | }
|
| | |
|
| | | /**
|
| | | * 初始化方法
|
| | | */
|
| | | @Override
|
| | | public void run(ApplicationArguments args) {
|
| | | if (!nettyTcpConfig.getEnabled()) {
|
| | | return;
|
| | | }
|
| | | initNettyTcp();//初始化nettyTcp
|
| | | List<ArdEquipRadar> ardEquipRadars = ardEquipRadarService.selectArdEquipRadarList(new ArdEquipRadar());
|
| | | for (ArdEquipRadar ardEquipRadar : ardEquipRadars) {
|
| | | String host = ardEquipRadar.getIp();
|
| | | Integer port = Integer.valueOf(ardEquipRadar.getPort());
|
| | | log.debug("TCP client try to connect radar【" + host + ":" + port + "】");
|
| | | connectServer(ardEquipRadar);//连接每一个雷达服务
|
| | | }
|
| | | }
|
| | |
|
| | | /**
|
| | | * 监测雷达连接状态
|
| | | */
|
| | | @Scheduled(cron = "0/5 * * * * ?")
|
| | | public void monitorConnectStatus() {
|
| | | log.debug("定时监测雷达连接状态");
|
| | | List<ArdEquipRadar> ardEquipRadars = ardEquipRadarService.selectArdEquipRadarList(new ArdEquipRadar());
|
| | | ardEquipRadars.stream().forEach(ardEquipRadar -> {
|
| | | if (ardEquipRadar.getState() == null || (ardEquipRadar.getState() != null && !ardEquipRadar.getState().equals("0"))) {
|
| | | ardEquipRadar.setState("0");
|
| | | ardEquipRadarService.updateArdEquipRadar(ardEquipRadar);
|
| | | }
|
| | | else
|
| | | {
|
| | | if (trueConnectMap.containsKey(ardEquipRadar.getIp() + ":" + ardEquipRadar.getPort())) {
|
| | | if (!ardEquipRadar.getState().equals("1")) {
|
| | | ardEquipRadar.setState("1");
|
| | | ardEquipRadarService.updateArdEquipRadar(ardEquipRadar);
|
| | | }
|
| | | }
|
| | | }
|
| | | });
|
| | | }
|
| | | }
|