package cn.org.hentai.jtt1078.server.backup;
|
|
import cn.org.hentai.jtt1078.entity.Jt1078Message;
|
import io.netty.buffer.ByteBuf;
|
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.handler.codec.ByteToMessageDecoder;
|
import lombok.extern.slf4j.Slf4j;
|
|
import java.util.List;
|
|
@Slf4j
|
public class Jt1078MessageDecoder extends ByteToMessageDecoder {
|
private static final int HEADER_MIN_LEN = 34;
|
private static final int MAX_SKIP_BYTES = 4096; // 防错机制
|
|
@Override
|
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
int skippedBytes = 0;
|
|
while (true) {
|
if (in.readableBytes() < 4) return;
|
|
in.markReaderIndex();
|
int header = in.readInt();
|
|
if ((header & 0x7FFFFFFF) != 0x30316364) {
|
in.resetReaderIndex();
|
in.readByte(); // 滑动窗口继续查找包头
|
skippedBytes++;
|
if (skippedBytes >= MAX_SKIP_BYTES) {
|
log.error("Too many invalid bytes ({}), close connection", skippedBytes);
|
ctx.close();
|
return;
|
}
|
continue;
|
}
|
|
// mark 包头对齐位置
|
in.markReaderIndex();
|
|
// 保证至少读完基本字段
|
if (in.readableBytes() < 1 + 1 + 2) {
|
in.resetReaderIndex();
|
return;
|
}
|
|
byte vpxcc = in.readByte();
|
byte mpt = in.readByte();
|
short seq = in.readShort();
|
|
// 先判断协议版本
|
boolean isV2019 = detectProtocolVersion(in);
|
int simLength = isV2019 ? 10 : 6;
|
|
// 判断总剩余数据是否足够(SIM 卡 + 通道 + 数据类型 + 时间戳 + iframe/frame interval + 长度字段)
|
if (in.readableBytes() < simLength + 1 + 1 + 8 + 2 + 2 + 2) {
|
in.resetReaderIndex();
|
return;
|
}
|
|
byte[] simBytes = new byte[simLength];
|
in.readBytes(simBytes);
|
String sim = bcdToString(simBytes);
|
|
byte channel = in.readByte();
|
byte dataTypeByte = in.readByte();
|
|
byte[] timestampBytes = new byte[8];
|
in.readBytes(timestampBytes);
|
short lastIframeInterval = in.readShort();
|
short lastFrameInterval = in.readShort();
|
|
short dataLength = in.readShort();
|
if (dataLength < 0) {
|
log.warn("Invalid dataLength: {}", dataLength);
|
in.resetReaderIndex();
|
in.readByte(); // 跳1字节再重新查找包头
|
skippedBytes++;
|
continue;
|
}
|
|
if (in.readableBytes() < dataLength) {
|
in.resetReaderIndex();
|
return; // 等待更多数据
|
}
|
|
byte[] data = new byte[dataLength];
|
in.readBytes(data);
|
|
Jt1078Message message = new Jt1078Message();
|
message.setHeader(header);
|
message.setVpxcc(vpxcc);
|
message.setMpt(mpt);
|
message.setSequence(seq);
|
message.setSim(sim);
|
message.setChannel(channel);
|
message.setDataTypeAndSpm(dataTypeByte);
|
message.setTimestamp(timestampBytes);
|
message.setLastIframeInterval(lastIframeInterval);
|
message.setLastFrameInterval(lastFrameInterval);
|
message.setLength(dataLength);
|
message.parseExtendedFields();
|
|
out.add(message);
|
log.warn("Parsed packet: {}", message.toString());
|
}
|
}
|
|
|
private boolean detectProtocolVersion(ByteBuf in) {
|
// 保存原始读取位置
|
in.markReaderIndex();
|
|
try {
|
// 2016协议SIM卡号位置:包头4 + VPXCC1 + MPT1 + SEQ2 = 8字节
|
// 所以SIM卡号在8-13字节(6字节)
|
// 2019协议SIM卡号在8-17字节(10字节)
|
|
// 跳到SIM卡号起始位置
|
in.skipBytes(8);
|
|
// 检查第14-17字节(2016协议这部分是通道号/数据类型,2019协议是SIM卡继续)
|
byte[] probeBytes = new byte[4];
|
in.readBytes(probeBytes);
|
|
// BCD码有效性检查:每半个字节应在0-9之间
|
for (byte b : probeBytes) {
|
if (((b & 0xF0) >>> 4) > 9 || ((b & 0x0F) > 9)) {
|
return true; // 发现非BCD码,可能是2019协议
|
}
|
}
|
return false;
|
} finally {
|
// 无论检测成功与否,都重置读取位置
|
in.resetReaderIndex();
|
}
|
}
|
private String bcdToString(byte[] bcd) {
|
StringBuilder sb = new StringBuilder();
|
for (byte b : bcd) {
|
sb.append(String.format("%02x", b));
|
}
|
return sb.toString();
|
}
|
}
|