package org.yzh.client.netty; import io.github.yezhihao.netmc.core.model.Message; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioChannelOption; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.MessageToByteEncoder; import lombok.extern.slf4j.Slf4j; import java.util.List; /** * @author yezhihao * https://gitee.com/yezhihao/jt808-server */ @Slf4j public class TCPClient { private EventLoopGroup workerGroup; private Channel channel; private ClientConfig config; private String id; public TCPClient(String id, ClientConfig config) { this.id = id; this.config = config; } private void startInternal() { try { this.workerGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(workerGroup); bootstrap.option(NioChannelOption.SO_REUSEADDR, true) .option(NioChannelOption.TCP_NODELAY, true) .option(NioChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer() { @Override public void initChannel(NioSocketChannel ch) { ch.pipeline() .addLast("frameDecoder", new DelimiterBasedFrameDecoder(config.maxFrameLength, Unpooled.wrappedBuffer(config.delimiter), Unpooled.wrappedBuffer(config.delimiter, config.delimiter))) .addLast("decoder", new ByteToMessageDecoder() { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List out) { log.info("<<<<<{}", ByteBufUtil.hexDump(buf)); Object msg = config.decoder.decode(buf, null); log.info("<<<<<<<<<<{}", msg); if (msg != null) out.add(msg); buf.skipBytes(buf.readableBytes()); } }) .addLast("encoder", new MessageToByteEncoder() { @Override protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) { log.info(">>>>>>>>>>{}", msg); ByteBuf buf = config.encoder.encode(msg, null); log.info(">>>>>{}", ByteBufUtil.hexDump(buf)); out.writeBytes(config.delimiter).writeBytes(buf).writeBytes(config.delimiter); buf.release(); } }) .addLast("adapter", config.adapter); } }); ChannelFuture channelFuture = bootstrap.connect(config.ip, config.port).sync(); this.channel = channelFuture.channel(); this.channel.closeFuture(); } catch (Exception e) { log.error("===TCP Client异常关闭", e); } } public void writeObject(Object message) { channel.writeAndFlush(message); } public synchronized TCPClient start() { startInternal(); log.warn("===TCP Client启动成功, id={}===", id); return this; } public synchronized void stop() { workerGroup.shutdownGracefully(); log.warn("===TCP Client已经停止, id={}===", id); } }