package cn.org.hentai.jtt1078.server.backup; import cn.org.hentai.jtt1078.entity.Jt1078Message; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import lombok.extern.slf4j.Slf4j; import java.util.List; @Slf4j public class Jt1078MessageDecoder extends ByteToMessageDecoder { private static final int HEADER_MIN_LEN = 34; private static final int MAX_SKIP_BYTES = 4096; // 防错机制 @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { int skippedBytes = 0; while (true) { if (in.readableBytes() < 4) return; in.markReaderIndex(); int header = in.readInt(); if ((header & 0x7FFFFFFF) != 0x30316364) { in.resetReaderIndex(); in.readByte(); // 滑动窗口继续查找包头 skippedBytes++; if (skippedBytes >= MAX_SKIP_BYTES) { log.error("Too many invalid bytes ({}), close connection", skippedBytes); ctx.close(); return; } continue; } // mark 包头对齐位置 in.markReaderIndex(); // 保证至少读完基本字段 if (in.readableBytes() < 1 + 1 + 2) { in.resetReaderIndex(); return; } byte vpxcc = in.readByte(); byte mpt = in.readByte(); short seq = in.readShort(); // 先判断协议版本 boolean isV2019 = detectProtocolVersion(in); int simLength = isV2019 ? 10 : 6; // 判断总剩余数据是否足够(SIM 卡 + 通道 + 数据类型 + 时间戳 + iframe/frame interval + 长度字段) if (in.readableBytes() < simLength + 1 + 1 + 8 + 2 + 2 + 2) { in.resetReaderIndex(); return; } byte[] simBytes = new byte[simLength]; in.readBytes(simBytes); String sim = bcdToString(simBytes); byte channel = in.readByte(); byte dataTypeByte = in.readByte(); byte[] timestampBytes = new byte[8]; in.readBytes(timestampBytes); short lastIframeInterval = in.readShort(); short lastFrameInterval = in.readShort(); short dataLength = in.readShort(); if (dataLength < 0) { log.warn("Invalid dataLength: {}", dataLength); in.resetReaderIndex(); in.readByte(); // 跳1字节再重新查找包头 skippedBytes++; continue; } if (in.readableBytes() < dataLength) { in.resetReaderIndex(); return; // 等待更多数据 } byte[] data = new byte[dataLength]; in.readBytes(data); Jt1078Message message = new Jt1078Message(); message.setHeader(header); message.setVpxcc(vpxcc); message.setMpt(mpt); message.setSequence(seq); message.setSim(sim); message.setChannel(channel); message.setDataTypeAndSpm(dataTypeByte); message.setTimestamp(timestampBytes); message.setLastIframeInterval(lastIframeInterval); message.setLastFrameInterval(lastFrameInterval); message.setLength(dataLength); message.parseExtendedFields(); out.add(message); log.warn("Parsed packet: {}", message.toString()); } } private boolean detectProtocolVersion(ByteBuf in) { // 保存原始读取位置 in.markReaderIndex(); try { // 2016协议SIM卡号位置:包头4 + VPXCC1 + MPT1 + SEQ2 = 8字节 // 所以SIM卡号在8-13字节(6字节) // 2019协议SIM卡号在8-17字节(10字节) // 跳到SIM卡号起始位置 in.skipBytes(8); // 检查第14-17字节(2016协议这部分是通道号/数据类型,2019协议是SIM卡继续) byte[] probeBytes = new byte[4]; in.readBytes(probeBytes); // BCD码有效性检查:每半个字节应在0-9之间 for (byte b : probeBytes) { if (((b & 0xF0) >>> 4) > 9 || ((b & 0x0F) > 9)) { return true; // 发现非BCD码,可能是2019协议 } } return false; } finally { // 无论检测成功与否,都重置读取位置 in.resetReaderIndex(); } } private String bcdToString(byte[] bcd) { StringBuilder sb = new StringBuilder(); for (byte b : bcd) { sb.append(String.format("%02x", b)); } return sb.toString(); } }