‘liusuyi’
2023-12-28 1c490ff27a0eeb4327923a9367f3a7c2aaa929b2
优化雷达tcp客户端
已修改6个文件
177 ■■■■ 文件已修改
src/main/java/com/ard/utils/netty/tcp/BootNettyChannelInboundHandlerAdapter.java 74 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/ard/utils/netty/tcp/BootNettyChannelInitializer.java 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/ard/utils/netty/tcp/BootNettyClient.java 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/ard/utils/netty/tcp/BootNettyClientChannelCache.java 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/ard/utils/netty/tcp/BootNettyClientThread.java 16 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/ard/utils/netty/tcp/DynamicClient.java 23 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/ard/utils/netty/tcp/BootNettyChannelInboundHandlerAdapter.java
@@ -9,10 +9,7 @@
import com.ard.utils.util.GisUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.EmptyByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -21,59 +18,44 @@
import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static com.ard.utils.util.ByteUtils.byteToBitString;
import static com.ard.utils.util.ByteUtils.toLittleEndian;
@Slf4j(topic = "netty")
@ChannelHandler.Sharable
public class BootNettyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter {
public class BootNettyChannelInboundHandlerAdapter extends SimpleChannelInboundHandler<ByteBuf> {
    /**
     * 从服务端收到新的数据时,这个方法会在收到消息时被调用
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException {
        if(msg == null){
            return;
        }
        //System.out.println("channelRead:read msg:"+msg1.toString());
        //BootNettyClientChannel bootNettyClientChannel = BootNettyClientChannelCache.get("clientId:"+ctx.channel().id().toString());
        //if(bootNettyClientChannel != null){
        //    System.out.println("to do");
        //    bootNettyClientChannel.setLast_data(msg1.toString());
        //}
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
        InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String host = inSocket.getAddress().getHostAddress();
        int port = inSocket.getPort();
        ArdEquipRadar ardEquipRadar = ClientInitialize.tureConnectMap.get(host+":"+port);
        // msg转Buf
        ByteBuf buf = (ByteBuf) msg;
        // 创建缓冲中字节数的字节数组
        byte[] byteArray = new byte[buf.readableBytes()];
        // 写入数组
        buf.readBytes(byteArray);
        // 处理接收到的消息
        byte[] bytes = MessageParsing.receiveCompletePacket(byteArray);
        if (bytes != null) {
            processData(ardEquipRadar, bytes);
        ArdEquipRadar ardEquipRadar = BootNettyClientChannelCache.getRadar(host + ":" + port);
        if (ardEquipRadar != null) {
            // 创建缓冲中字节数的字节数组
            byte[] byteArray = new byte[msg.readableBytes()];
            // 写入数组
            msg.readBytes(byteArray);
            // 处理接收到的消息
            byte[] bytes = MessageParsing.receiveCompletePacket(byteArray);
            if (bytes != null) {
                processData(ardEquipRadar, bytes);
            }
        }
        //回应服务端
        //ctx.write("I got server message thanks server!");
    }
    /**
     * 从服务端收到新的数据、读取完成时调用
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
        System.out.println("channelReadComplete");
        //System.out.println("channelReadComplete");
        ctx.flush();
    }
    /**
     * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
     */
@@ -83,7 +65,7 @@
        cause.printStackTrace();
        ctx.close();//抛出异常,断开与客户端的连接
    }
    /**
     * 客户端与服务端第一次建立连接时 执行
     */
@@ -92,25 +74,29 @@
        super.channelActive(ctx);
        // 客户端与服务端 建立连接
        InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = inSocket.getAddress().getHostAddress();
        String host = inSocket.getAddress().getHostAddress();
        int port = inSocket.getPort();
        log.debug("连接成功:【"+clientIp+":"+port+"】");
        log.debug("连接成功:【" + host + ":" + port + "】");
    }
    /**
     * 客户端与服务端 断连时 执行
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception{
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();
        int port = ipSocket.getPort();
        String host = ipSocket.getHostString();
        log.error("与设备" + host + ":" + port + "连接断开!");
        // 重连
        BootNettyClientThread thread = new BootNettyClientThread(host,port);
        thread.start();
        ArdEquipRadar ardEquipRadar = BootNettyClientChannelCache.getRadar(host + ":" + port);
        if (ardEquipRadar != null) {
            BootNettyClientThread thread = new BootNettyClientThread(ardEquipRadar);
            thread.start();
        }
    }
    /**
     * 解析报警数据
     */
@@ -150,7 +136,7 @@
            List<ArdAlarmRadar> well = new ArrayList<>();
            String alarmTime = "";
            Integer targetNum = 0;
            log.debug("Processing radar data 【" + radarName + "】数据-->命令ID:"  + cmdIdStr + "二进制:" + byteToBitString(cmdId[0]));
            log.debug("Processing radar data 【" + radarName + "】数据-->命令ID:" + cmdIdStr + "二进制:" + byteToBitString(cmdId[0]));
            //雷达移动防火报警
            if (Arrays.equals(cmdId, new byte[]{0x01})) {
                //region 告警信息反馈
src/main/java/com/ard/utils/netty/tcp/BootNettyChannelInitializer.java
@@ -1,25 +1,20 @@
package com.ard.utils.netty.tcp;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
@ChannelHandler.Sharable
public class BootNettyChannelInitializer<SocketChannel> extends ChannelInitializer<Channel> {
public class BootNettyChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
        ch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
    protected void initChannel(SocketChannel ch){
        //ch.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
        //ch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
        /**
         * 自定义ChannelInboundHandlerAdapter
         */
        ch.pipeline().addLast(new BootNettyChannelInboundHandlerAdapter());
    }
}
src/main/java/com/ard/utils/netty/tcp/BootNettyClient.java
@@ -23,6 +23,8 @@
    IArdEquipRadarService ardEquipRadarService;
    @Resource
    NettyTcpConfiguration nettyTcpConfig;
    static Integer waitTimes = 1;
    static EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    /**
@@ -33,17 +35,21 @@
            group = eventLoopGroup;
        }
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, true).handler(new BootNettyChannelInitializer<SocketChannel>());
        bootstrap.group(group).channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .handler(new BootNettyChannelInitializer());
        return bootstrap;
    }
    public void connect( String host,int port) throws Exception {
    public void connect(ArdEquipRadar radar) throws Exception {
        String host = radar.getIp();
        int port=radar.getPort();
        log.debug("正在进行连接:【" + host+":"+port+"】");
        eventLoopGroup.shutdownGracefully();
        eventLoopGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = getBootstrap(null);
        try {
            bootstrap.remoteAddress(host, port);
            // 异步连接tcp服务端
@@ -57,13 +63,14 @@
                    bootNettyClientChannel.setChannel(channel);
                    bootNettyClientChannel.setCode("clientId:" + id);
                    BootNettyClientChannelCache.save("clientId:" + id, bootNettyClientChannel);
                    BootNettyClientChannelCache.save(host+":"+port,radar);
                    log.debug("netty client start success=" + id);
                } else {
//                    System.err.println("连接失败," + waitTimes.toString() + "秒后重新连接:" + host);
                    try {
                        Thread.sleep(waitTimes * 1000);
                    } finally {
                        connect(host,port);
                        connect(radar);
                    }
                }
            });
@@ -73,7 +80,7 @@
            try {
                Thread.sleep(waitTimes * 1000);
            } finally {
                connect(host,port);
                connect(radar);
            }
            e.printStackTrace();
        } finally {
@@ -82,7 +89,7 @@
             */
            eventLoopGroup.shutdownGracefully().sync();
        }
    }
    /**
     * 初始化方法
@@ -97,7 +104,7 @@
            String host = ardEquipRadar.getIp();
            Integer port = Integer.valueOf(ardEquipRadar.getPort());
            log.debug("TCP client try to connect radar【:" + host + ":" + port+"】");
            BootNettyClientThread thread = new BootNettyClientThread(host,port);
            BootNettyClientThread thread = new BootNettyClientThread(ardEquipRadar);
            thread.start();
        }
    }
src/main/java/com/ard/utils/netty/tcp/BootNettyClientChannelCache.java
@@ -1,29 +1,41 @@
package com.ard.utils.netty.tcp;
import com.ard.alarm.radar.domain.ArdEquipRadar;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class BootNettyClientChannelCache {
    public static volatile Map<String, ArdEquipRadar> radarMapCache = new ConcurrentHashMap<String, ArdEquipRadar>();
    public static volatile Map<String, BootNettyClientChannel> channelMapCache = new ConcurrentHashMap<String, BootNettyClientChannel>();
 
    public static void add(String code, BootNettyClientChannel channel){
        channelMapCache.put(code,channel);
    }
    public static void addRadar(String code, ArdEquipRadar radar){
        radarMapCache.put(code,radar);
    }
    public static BootNettyClientChannel get(String code){
        return channelMapCache.get(code);
    }
    public static ArdEquipRadar getRadar(String code){
        return radarMapCache.get(code);
    }
    public static void remove(String code){
        channelMapCache.remove(code);
    }
    public static void removeRadar(String code){
        radarMapCache.remove(code);
    }
    public static void save(String code, BootNettyClientChannel channel) {
        if(channelMapCache.get(code) == null) {
            add(code,channel);
        }
    }
    public static void save(String code, ArdEquipRadar radar) {
        if(radarMapCache.get(code) == null) {
            addRadar(code,radar);
        }
    }
 
}
src/main/java/com/ard/utils/netty/tcp/BootNettyClientThread.java
@@ -1,18 +1,18 @@
package com.ard.utils.netty.tcp;
public class BootNettyClientThread extends Thread {
    private final int port;
    private final String host;
import com.ard.alarm.radar.domain.ArdEquipRadar;
    public BootNettyClientThread(String host,int port){
        this.port = port;
        this.host = host;
public class BootNettyClientThread extends Thread {
    private final ArdEquipRadar ardEquipRadar;
    public BootNettyClientThread(ArdEquipRadar ardEquipRadar){
        this.ardEquipRadar = ardEquipRadar;
    }
 
    public void run() {
        try {
            new BootNettyClient().connect(host,port);
            new BootNettyClient().connect(ardEquipRadar);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
src/main/java/com/ard/utils/netty/tcp/DynamicClient.java
@@ -11,6 +11,7 @@
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
@@ -131,22 +132,20 @@
        if (!nettyTcpConfig.getEnabled()) {
            return;
        }
        //EventLoopGroup group = new NioEventLoopGroup();
        //Bootstrap bootstrap = new Bootstrap();
        //bootstrap.group(group)
        //        .channel(NioSocketChannel.class)
        //        .option(ChannelOption.TCP_NODELAY, true)
        //        .option(ChannelOption.SO_KEEPALIVE, true)
        //        .handler(new DynamicClientInitializer());
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .handler(new DynamicClientInitializer());
        List<ArdEquipRadar> ardEquipRadars = ardEquipRadarService.selectArdEquipRadarList(new ArdEquipRadar());
        for (ArdEquipRadar ardEquipRadar : ardEquipRadars) {
            String host = ardEquipRadar.getIp();
            Integer port = Integer.valueOf(ardEquipRadar.getPort());
            log.debug("TCP client try to connect radar【:" + host + ":" + port+"】");
            // connectServer(ardEquipRadar);//连接每一个雷达服务
           // connect(bootstrap, ardEquipRadar);
            BootNettyClientThread thread = new BootNettyClientThread(host,port);
            thread.start();
            log.debug("TCP client try to connect radar【:" + host + ":" + port + "】");
           // connectServer(ardEquipRadar);//连接每一个雷达服务
            connect(bootstrap, ardEquipRadar);
        }
    }
}