From 1c490ff27a0eeb4327923a9367f3a7c2aaa929b2 Mon Sep 17 00:00:00 2001 From: ‘liusuyi’ <1951119284@qq.com> Date: 星期四, 28 十二月 2023 14:03:11 +0800 Subject: [PATCH] 优化雷达tcp客户端 --- src/main/java/com/ard/utils/netty/tcp/BootNettyClientChannelCache.java | 22 +++++- src/main/java/com/ard/utils/netty/tcp/DynamicClient.java | 23 +++---- src/main/java/com/ard/utils/netty/tcp/BootNettyChannelInboundHandlerAdapter.java | 74 ++++++++++-------------- src/main/java/com/ard/utils/netty/tcp/BootNettyChannelInitializer.java | 17 ++--- src/main/java/com/ard/utils/netty/tcp/BootNettyClientThread.java | 16 ++-- src/main/java/com/ard/utils/netty/tcp/BootNettyClient.java | 25 +++++--- 6 files changed, 88 insertions(+), 89 deletions(-) diff --git a/src/main/java/com/ard/utils/netty/tcp/BootNettyChannelInboundHandlerAdapter.java b/src/main/java/com/ard/utils/netty/tcp/BootNettyChannelInboundHandlerAdapter.java index d0bf436..561d287 100644 --- a/src/main/java/com/ard/utils/netty/tcp/BootNettyChannelInboundHandlerAdapter.java +++ b/src/main/java/com/ard/utils/netty/tcp/BootNettyChannelInboundHandlerAdapter.java @@ -9,10 +9,7 @@ import com.ard.utils.util.GisUtils; import io.netty.buffer.ByteBuf; import io.netty.buffer.EmptyByteBuf; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelId; -import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.*; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -21,59 +18,44 @@ import java.net.InetSocketAddress; import java.text.SimpleDateFormat; import java.util.*; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; import static com.ard.utils.util.ByteUtils.byteToBitString; import static com.ard.utils.util.ByteUtils.toLittleEndian; @Slf4j(topic = "netty") -@ChannelHandler.Sharable -public class BootNettyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter { - +public class BootNettyChannelInboundHandlerAdapter extends SimpleChannelInboundHandler<ByteBuf> { + /** * 浠庢湇鍔$鏀跺埌鏂扮殑鏁版嵁鏃讹紝杩欎釜鏂规硶浼氬湪鏀跺埌娑堟伅鏃惰璋冪敤 */ @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException { - if(msg == null){ - return; - } - - //System.out.println("channelRead:read msg:"+msg1.toString()); - //BootNettyClientChannel bootNettyClientChannel = BootNettyClientChannelCache.get("clientId:"+ctx.channel().id().toString()); - //if(bootNettyClientChannel != null){ - // System.out.println("to do"); - // bootNettyClientChannel.setLast_data(msg1.toString()); - //} + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) { InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress(); String host = inSocket.getAddress().getHostAddress(); int port = inSocket.getPort(); - ArdEquipRadar ardEquipRadar = ClientInitialize.tureConnectMap.get(host+":"+port); - // msg杞珺uf - ByteBuf buf = (ByteBuf) msg; - // 鍒涘缓缂撳啿涓瓧鑺傛暟鐨勫瓧鑺傛暟缁� - byte[] byteArray = new byte[buf.readableBytes()]; - // 鍐欏叆鏁扮粍 - buf.readBytes(byteArray); - // 澶勭悊鎺ユ敹鍒扮殑娑堟伅 - byte[] bytes = MessageParsing.receiveCompletePacket(byteArray); - if (bytes != null) { - processData(ardEquipRadar, bytes); + ArdEquipRadar ardEquipRadar = BootNettyClientChannelCache.getRadar(host + ":" + port); + if (ardEquipRadar != null) { + // 鍒涘缓缂撳啿涓瓧鑺傛暟鐨勫瓧鑺傛暟缁� + byte[] byteArray = new byte[msg.readableBytes()]; + // 鍐欏叆鏁扮粍 + msg.readBytes(byteArray); + // 澶勭悊鎺ユ敹鍒扮殑娑堟伅 + byte[] bytes = MessageParsing.receiveCompletePacket(byteArray); + if (bytes != null) { + processData(ardEquipRadar, bytes); + } } - //鍥炲簲鏈嶅姟绔� - //ctx.write("I got server message thanks server!"); } - + /** * 浠庢湇鍔$鏀跺埌鏂扮殑鏁版嵁銆佽鍙栧畬鎴愭椂璋冪敤 */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws IOException { - System.out.println("channelReadComplete"); + //System.out.println("channelReadComplete"); ctx.flush(); } - + /** * 褰撳嚭鐜� Throwable 瀵硅薄鎵嶄細琚皟鐢紝鍗冲綋 Netty 鐢变簬 IO 閿欒鎴栬�呭鐞嗗櫒鍦ㄥ鐞嗕簨浠舵椂鎶涘嚭鐨勫紓甯告椂 */ @@ -83,7 +65,7 @@ cause.printStackTrace(); ctx.close();//鎶涘嚭寮傚父锛屾柇寮�涓庡鎴风鐨勮繛鎺� } - + /** * 瀹㈡埛绔笌鏈嶅姟绔涓�娆″缓绔嬭繛鎺ユ椂 鎵ц */ @@ -92,25 +74,29 @@ super.channelActive(ctx); // 瀹㈡埛绔笌鏈嶅姟绔� 寤虹珛杩炴帴 InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress(); - String clientIp = inSocket.getAddress().getHostAddress(); + String host = inSocket.getAddress().getHostAddress(); int port = inSocket.getPort(); - log.debug("杩炴帴鎴愬姛锛氥��"+clientIp+":"+port+"銆�"); + log.debug("杩炴帴鎴愬姛锛氥��" + host + ":" + port + "銆�"); } - + /** * 瀹㈡埛绔笌鏈嶅姟绔� 鏂繛鏃� 鎵ц */ @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception{ + public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress(); int port = ipSocket.getPort(); String host = ipSocket.getHostString(); log.error("涓庤澶�" + host + ":" + port + "杩炴帴鏂紑!"); // 閲嶈繛 - BootNettyClientThread thread = new BootNettyClientThread(host,port); - thread.start(); + ArdEquipRadar ardEquipRadar = BootNettyClientChannelCache.getRadar(host + ":" + port); + if (ardEquipRadar != null) { + BootNettyClientThread thread = new BootNettyClientThread(ardEquipRadar); + thread.start(); + } } + /** * 瑙f瀽鎶ヨ鏁版嵁 */ @@ -150,7 +136,7 @@ List<ArdAlarmRadar> well = new ArrayList<>(); String alarmTime = ""; Integer targetNum = 0; - log.debug("Processing radar data 銆�" + radarName + "銆戞暟鎹�-->鍛戒护ID:" + cmdIdStr + "浜岃繘鍒�:" + byteToBitString(cmdId[0])); + log.debug("Processing radar data 銆�" + radarName + "銆戞暟鎹�-->鍛戒护ID:" + cmdIdStr + "浜岃繘鍒�:" + byteToBitString(cmdId[0])); //闆疯揪绉诲姩闃茬伀鎶ヨ if (Arrays.equals(cmdId, new byte[]{0x01})) { //region 鍛婅淇℃伅鍙嶉 diff --git a/src/main/java/com/ard/utils/netty/tcp/BootNettyChannelInitializer.java b/src/main/java/com/ard/utils/netty/tcp/BootNettyChannelInitializer.java index 0965015..d87b9ff 100644 --- a/src/main/java/com/ard/utils/netty/tcp/BootNettyChannelInitializer.java +++ b/src/main/java/com/ard/utils/netty/tcp/BootNettyChannelInitializer.java @@ -1,25 +1,20 @@ package com.ard.utils.netty.tcp; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; +import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; -@ChannelHandler.Sharable -public class BootNettyChannelInitializer<SocketChannel> extends ChannelInitializer<Channel> { - +public class BootNettyChannelInitializer extends ChannelInitializer<SocketChannel> { + @Override - protected void initChannel(Channel ch) throws Exception { - - ch.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); - ch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); + protected void initChannel(SocketChannel ch){ + //ch.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); + //ch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); /** * 鑷畾涔塁hannelInboundHandlerAdapter */ ch.pipeline().addLast(new BootNettyChannelInboundHandlerAdapter()); - } - } \ No newline at end of file diff --git a/src/main/java/com/ard/utils/netty/tcp/BootNettyClient.java b/src/main/java/com/ard/utils/netty/tcp/BootNettyClient.java index 6006f7c..da103a6 100644 --- a/src/main/java/com/ard/utils/netty/tcp/BootNettyClient.java +++ b/src/main/java/com/ard/utils/netty/tcp/BootNettyClient.java @@ -23,6 +23,8 @@ IArdEquipRadarService ardEquipRadarService; @Resource NettyTcpConfiguration nettyTcpConfig; + + static Integer waitTimes = 1; static EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); /** @@ -33,17 +35,21 @@ group = eventLoopGroup; } Bootstrap bootstrap = new Bootstrap(); - bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true) - .option(ChannelOption.SO_KEEPALIVE, true).handler(new BootNettyChannelInitializer<SocketChannel>()); + bootstrap.group(group).channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_KEEPALIVE, true) + .handler(new BootNettyChannelInitializer()); return bootstrap; } - - public void connect( String host,int port) throws Exception { + + public void connect(ArdEquipRadar radar) throws Exception { + String host = radar.getIp(); + int port=radar.getPort(); log.debug("姝e湪杩涜杩炴帴锛氥��" + host+":"+port+"銆�"); eventLoopGroup.shutdownGracefully(); eventLoopGroup = new NioEventLoopGroup(); Bootstrap bootstrap = getBootstrap(null); - + try { bootstrap.remoteAddress(host, port); // 寮傛杩炴帴tcp鏈嶅姟绔� @@ -57,13 +63,14 @@ bootNettyClientChannel.setChannel(channel); bootNettyClientChannel.setCode("clientId:" + id); BootNettyClientChannelCache.save("clientId:" + id, bootNettyClientChannel); + BootNettyClientChannelCache.save(host+":"+port,radar); log.debug("netty client start success=" + id); } else { // System.err.println("杩炴帴澶辫触锛�" + waitTimes.toString() + "绉掑悗閲嶆柊杩炴帴:" + host); try { Thread.sleep(waitTimes * 1000); } finally { - connect(host,port); + connect(radar); } } }); @@ -73,7 +80,7 @@ try { Thread.sleep(waitTimes * 1000); } finally { - connect(host,port); + connect(radar); } e.printStackTrace(); } finally { @@ -82,7 +89,7 @@ */ eventLoopGroup.shutdownGracefully().sync(); } - + } /** * 鍒濆鍖栨柟娉� @@ -97,7 +104,7 @@ String host = ardEquipRadar.getIp(); Integer port = Integer.valueOf(ardEquipRadar.getPort()); log.debug("TCP client try to connect radar銆愶細" + host + ":" + port+"銆�"); - BootNettyClientThread thread = new BootNettyClientThread(host,port); + BootNettyClientThread thread = new BootNettyClientThread(ardEquipRadar); thread.start(); } } diff --git a/src/main/java/com/ard/utils/netty/tcp/BootNettyClientChannelCache.java b/src/main/java/com/ard/utils/netty/tcp/BootNettyClientChannelCache.java index 9cfb87b..4d31833 100644 --- a/src/main/java/com/ard/utils/netty/tcp/BootNettyClientChannelCache.java +++ b/src/main/java/com/ard/utils/netty/tcp/BootNettyClientChannelCache.java @@ -1,29 +1,41 @@ package com.ard.utils.netty.tcp; +import com.ard.alarm.radar.domain.ArdEquipRadar; + import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class BootNettyClientChannelCache { - + public static volatile Map<String, ArdEquipRadar> radarMapCache = new ConcurrentHashMap<String, ArdEquipRadar>(); public static volatile Map<String, BootNettyClientChannel> channelMapCache = new ConcurrentHashMap<String, BootNettyClientChannel>(); public static void add(String code, BootNettyClientChannel channel){ channelMapCache.put(code,channel); } - + public static void addRadar(String code, ArdEquipRadar radar){ + radarMapCache.put(code,radar); + } public static BootNettyClientChannel get(String code){ return channelMapCache.get(code); } - + public static ArdEquipRadar getRadar(String code){ + return radarMapCache.get(code); + } public static void remove(String code){ channelMapCache.remove(code); } - + public static void removeRadar(String code){ + radarMapCache.remove(code); + } public static void save(String code, BootNettyClientChannel channel) { if(channelMapCache.get(code) == null) { add(code,channel); } } - + public static void save(String code, ArdEquipRadar radar) { + if(radarMapCache.get(code) == null) { + addRadar(code,radar); + } + } } \ No newline at end of file diff --git a/src/main/java/com/ard/utils/netty/tcp/BootNettyClientThread.java b/src/main/java/com/ard/utils/netty/tcp/BootNettyClientThread.java index f08733f..8aa85ff 100644 --- a/src/main/java/com/ard/utils/netty/tcp/BootNettyClientThread.java +++ b/src/main/java/com/ard/utils/netty/tcp/BootNettyClientThread.java @@ -1,18 +1,18 @@ package com.ard.utils.netty.tcp; -public class BootNettyClientThread extends Thread { - - private final int port; - private final String host; +import com.ard.alarm.radar.domain.ArdEquipRadar; - public BootNettyClientThread(String host,int port){ - this.port = port; - this.host = host; +public class BootNettyClientThread extends Thread { + + private final ArdEquipRadar ardEquipRadar; + + public BootNettyClientThread(ArdEquipRadar ardEquipRadar){ + this.ardEquipRadar = ardEquipRadar; } public void run() { try { - new BootNettyClient().connect(host,port); + new BootNettyClient().connect(ardEquipRadar); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/src/main/java/com/ard/utils/netty/tcp/DynamicClient.java b/src/main/java/com/ard/utils/netty/tcp/DynamicClient.java index 94277ff..8d88675 100644 --- a/src/main/java/com/ard/utils/netty/tcp/DynamicClient.java +++ b/src/main/java/com/ard/utils/netty/tcp/DynamicClient.java @@ -11,6 +11,7 @@ import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; + import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; @@ -131,22 +132,20 @@ if (!nettyTcpConfig.getEnabled()) { return; } - //EventLoopGroup group = new NioEventLoopGroup(); - //Bootstrap bootstrap = new Bootstrap(); - //bootstrap.group(group) - // .channel(NioSocketChannel.class) - // .option(ChannelOption.TCP_NODELAY, true) - // .option(ChannelOption.SO_KEEPALIVE, true) - // .handler(new DynamicClientInitializer()); + EventLoopGroup group = new NioEventLoopGroup(); + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(group) + .channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_KEEPALIVE, true) + .handler(new DynamicClientInitializer()); List<ArdEquipRadar> ardEquipRadars = ardEquipRadarService.selectArdEquipRadarList(new ArdEquipRadar()); for (ArdEquipRadar ardEquipRadar : ardEquipRadars) { String host = ardEquipRadar.getIp(); Integer port = Integer.valueOf(ardEquipRadar.getPort()); - log.debug("TCP client try to connect radar銆愶細" + host + ":" + port+"銆�"); - // connectServer(ardEquipRadar);//杩炴帴姣忎竴涓浄杈炬湇鍔� - // connect(bootstrap, ardEquipRadar); - BootNettyClientThread thread = new BootNettyClientThread(host,port); - thread.start(); + log.debug("TCP client try to connect radar銆愶細" + host + ":" + port + "銆�"); + // connectServer(ardEquipRadar);//杩炴帴姣忎竴涓浄杈炬湇鍔� + connect(bootstrap, ardEquipRadar); } } } \ No newline at end of file -- Gitblit v1.9.3