src/main/java/com/ard/utils/netty/tcp/BootNettyChannelInboundHandlerAdapter.java
@@ -9,10 +9,7 @@ import com.ard.utils.util.GisUtils; import io.netty.buffer.ByteBuf; import io.netty.buffer.EmptyByteBuf; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelId; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.*; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -21,59 +18,44 @@ import java.net.InetSocketAddress; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import static com.ard.utils.util.ByteUtils.byteToBitString; import static com.ard.utils.util.ByteUtils.toLittleEndian; @Slf4j(topic = "netty") @ChannelHandler.Sharable public class BootNettyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter { public class BootNettyChannelInboundHandlerAdapter extends SimpleChannelInboundHandler<ByteBuf> { /** * 从服务端收到新的数据时,这个方法会在收到消息时被调用 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException { if(msg == null){ return; } //System.out.println("channelRead:read msg:"+msg1.toString()); //BootNettyClientChannel bootNettyClientChannel = BootNettyClientChannelCache.get("clientId:"+ctx.channel().id().toString()); //if(bootNettyClientChannel != null){ // System.out.println("to do"); // bootNettyClientChannel.setLast_data(msg1.toString()); //} protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) { InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress(); String host = inSocket.getAddress().getHostAddress(); int port = inSocket.getPort(); ArdEquipRadar ardEquipRadar = ClientInitialize.tureConnectMap.get(host+":"+port); // msg转Buf ByteBuf buf = (ByteBuf) msg; // 创建缓冲中字节数的字节数组 byte[] byteArray = new byte[buf.readableBytes()]; // 写入数组 buf.readBytes(byteArray); // 处理接收到的消息 byte[] bytes = MessageParsing.receiveCompletePacket(byteArray); if (bytes != null) { processData(ardEquipRadar, bytes); ArdEquipRadar ardEquipRadar = BootNettyClientChannelCache.getRadar(host + ":" + port); if (ardEquipRadar != null) { // 创建缓冲中字节数的字节数组 byte[] byteArray = new byte[msg.readableBytes()]; // 写入数组 msg.readBytes(byteArray); // 处理接收到的消息 byte[] bytes = MessageParsing.receiveCompletePacket(byteArray); if (bytes != null) { processData(ardEquipRadar, bytes); } } //回应服务端 //ctx.write("I got server message thanks server!"); } /** * 从服务端收到新的数据、读取完成时调用 */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws IOException { System.out.println("channelReadComplete"); //System.out.println("channelReadComplete"); ctx.flush(); } /** * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时 */ @@ -83,7 +65,7 @@ cause.printStackTrace(); ctx.close();//抛出异常,断开与客户端的连接 } /** * 客户端与服务端第一次建立连接时 执行 */ @@ -92,25 +74,29 @@ super.channelActive(ctx); // 客户端与服务端 建立连接 InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = inSocket.getAddress().getHostAddress(); String host = inSocket.getAddress().getHostAddress(); int port = inSocket.getPort(); log.debug("连接成功:【"+clientIp+":"+port+"】"); log.debug("连接成功:【" + host + ":" + port + "】"); } /** * 客户端与服务端 断连时 执行 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception{ public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress(); int port = ipSocket.getPort(); String host = ipSocket.getHostString(); log.error("与设备" + host + ":" + port + "连接断开!"); // 重连 BootNettyClientThread thread = new BootNettyClientThread(host,port); thread.start(); ArdEquipRadar ardEquipRadar = BootNettyClientChannelCache.getRadar(host + ":" + port); if (ardEquipRadar != null) { BootNettyClientThread thread = new BootNettyClientThread(ardEquipRadar); thread.start(); } } /** * 解析报警数据 */ @@ -150,7 +136,7 @@ List<ArdAlarmRadar> well = new ArrayList<>(); String alarmTime = ""; Integer targetNum = 0; log.debug("Processing radar data 【" + radarName + "】数据-->命令ID:" + cmdIdStr + "二进制:" + byteToBitString(cmdId[0])); log.debug("Processing radar data 【" + radarName + "】数据-->命令ID:" + cmdIdStr + "二进制:" + byteToBitString(cmdId[0])); //雷达移动防火报警 if (Arrays.equals(cmdId, new byte[]{0x01})) { //region 告警信息反馈 src/main/java/com/ard/utils/netty/tcp/BootNettyChannelInitializer.java
@@ -1,25 +1,20 @@ package com.ard.utils.netty.tcp; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; @ChannelHandler.Sharable public class BootNettyChannelInitializer<SocketChannel> extends ChannelInitializer<Channel> { public class BootNettyChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); ch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); protected void initChannel(SocketChannel ch){ //ch.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); //ch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); /** * 自定义ChannelInboundHandlerAdapter */ ch.pipeline().addLast(new BootNettyChannelInboundHandlerAdapter()); } } src/main/java/com/ard/utils/netty/tcp/BootNettyClient.java
@@ -23,6 +23,8 @@ IArdEquipRadarService ardEquipRadarService; @Resource NettyTcpConfiguration nettyTcpConfig; static Integer waitTimes = 1; static EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); /** @@ -33,17 +35,21 @@ group = eventLoopGroup; } Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true).handler(new BootNettyChannelInitializer<SocketChannel>()); bootstrap.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new BootNettyChannelInitializer()); return bootstrap; } public void connect( String host,int port) throws Exception { public void connect(ArdEquipRadar radar) throws Exception { String host = radar.getIp(); int port=radar.getPort(); log.debug("正在进行连接:【" + host+":"+port+"】"); eventLoopGroup.shutdownGracefully(); eventLoopGroup = new NioEventLoopGroup(); Bootstrap bootstrap = getBootstrap(null); try { bootstrap.remoteAddress(host, port); // 异步连接tcp服务端 @@ -57,13 +63,14 @@ bootNettyClientChannel.setChannel(channel); bootNettyClientChannel.setCode("clientId:" + id); BootNettyClientChannelCache.save("clientId:" + id, bootNettyClientChannel); BootNettyClientChannelCache.save(host+":"+port,radar); log.debug("netty client start success=" + id); } else { // System.err.println("连接失败," + waitTimes.toString() + "秒后重新连接:" + host); try { Thread.sleep(waitTimes * 1000); } finally { connect(host,port); connect(radar); } } }); @@ -73,7 +80,7 @@ try { Thread.sleep(waitTimes * 1000); } finally { connect(host,port); connect(radar); } e.printStackTrace(); } finally { @@ -82,7 +89,7 @@ */ eventLoopGroup.shutdownGracefully().sync(); } } /** * 初始化方法 @@ -97,7 +104,7 @@ String host = ardEquipRadar.getIp(); Integer port = Integer.valueOf(ardEquipRadar.getPort()); log.debug("TCP client try to connect radar【:" + host + ":" + port+"】"); BootNettyClientThread thread = new BootNettyClientThread(host,port); BootNettyClientThread thread = new BootNettyClientThread(ardEquipRadar); thread.start(); } } src/main/java/com/ard/utils/netty/tcp/BootNettyClientChannelCache.java
@@ -1,29 +1,41 @@ package com.ard.utils.netty.tcp; import com.ard.alarm.radar.domain.ArdEquipRadar; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class BootNettyClientChannelCache { public static volatile Map<String, ArdEquipRadar> radarMapCache = new ConcurrentHashMap<String, ArdEquipRadar>(); public static volatile Map<String, BootNettyClientChannel> channelMapCache = new ConcurrentHashMap<String, BootNettyClientChannel>(); public static void add(String code, BootNettyClientChannel channel){ channelMapCache.put(code,channel); } public static void addRadar(String code, ArdEquipRadar radar){ radarMapCache.put(code,radar); } public static BootNettyClientChannel get(String code){ return channelMapCache.get(code); } public static ArdEquipRadar getRadar(String code){ return radarMapCache.get(code); } public static void remove(String code){ channelMapCache.remove(code); } public static void removeRadar(String code){ radarMapCache.remove(code); } public static void save(String code, BootNettyClientChannel channel) { if(channelMapCache.get(code) == null) { add(code,channel); } } public static void save(String code, ArdEquipRadar radar) { if(radarMapCache.get(code) == null) { addRadar(code,radar); } } } src/main/java/com/ard/utils/netty/tcp/BootNettyClientThread.java
@@ -1,18 +1,18 @@ package com.ard.utils.netty.tcp; public class BootNettyClientThread extends Thread { private final int port; private final String host; import com.ard.alarm.radar.domain.ArdEquipRadar; public BootNettyClientThread(String host,int port){ this.port = port; this.host = host; public class BootNettyClientThread extends Thread { private final ArdEquipRadar ardEquipRadar; public BootNettyClientThread(ArdEquipRadar ardEquipRadar){ this.ardEquipRadar = ardEquipRadar; } public void run() { try { new BootNettyClient().connect(host,port); new BootNettyClient().connect(ardEquipRadar); } catch (Exception e) { throw new RuntimeException(e); } src/main/java/com/ard/utils/netty/tcp/DynamicClient.java
@@ -11,6 +11,7 @@ import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; @@ -131,22 +132,20 @@ if (!nettyTcpConfig.getEnabled()) { return; } //EventLoopGroup group = new NioEventLoopGroup(); //Bootstrap bootstrap = new Bootstrap(); //bootstrap.group(group) // .channel(NioSocketChannel.class) // .option(ChannelOption.TCP_NODELAY, true) // .option(ChannelOption.SO_KEEPALIVE, true) // .handler(new DynamicClientInitializer()); EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new DynamicClientInitializer()); List<ArdEquipRadar> ardEquipRadars = ardEquipRadarService.selectArdEquipRadarList(new ArdEquipRadar()); for (ArdEquipRadar ardEquipRadar : ardEquipRadars) { String host = ardEquipRadar.getIp(); Integer port = Integer.valueOf(ardEquipRadar.getPort()); log.debug("TCP client try to connect radar【:" + host + ":" + port+"】"); // connectServer(ardEquipRadar);//连接每一个雷达服务 // connect(bootstrap, ardEquipRadar); BootNettyClientThread thread = new BootNettyClientThread(host,port); thread.start(); log.debug("TCP client try to connect radar【:" + host + ":" + port + "】"); // connectServer(ardEquipRadar);//连接每一个雷达服务 connect(bootstrap, ardEquipRadar); } } }