package cn.org.hentai.jtt1078.server; import cn.org.hentai.jtt1078.entity.Media; import cn.org.hentai.jtt1078.entity.MediaEncoding; import cn.org.hentai.jtt1078.publisher.Channel; import cn.org.hentai.jtt1078.publisher.PublishManager; import cn.org.hentai.jtt1078.entity.Audio; import cn.org.hentai.jtt1078.util.Packet; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.Attribute; import io.netty.util.AttributeKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Iterator; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; /** * Created by matrixy on 2019/4/9. */ public class Jtt1078Handler extends SimpleChannelInboundHandler { static Logger logger = LoggerFactory.getLogger(Jtt1078Handler.class); private static final AttributeKey SESSION_KEY = AttributeKey.valueOf("session-key"); // @Override protected void channelRead011(ChannelHandlerContext ctx, Packet packet) throws Exception { io.netty.channel.Channel nettyChannel = ctx.channel(); // 检查协议版本 packet.seek(14); byte b14 = packet.nextByte(); byte b15 = packet.nextByte(); boolean is2019Protocol = (b14 & 0xFF) == 0x00 && (b15 & 0xFF) == 0x00; int simLength = is2019Protocol ? 10 : 6; // 读取SIM卡和通道号 packet.seek(8); StringBuilder simBuilder = new StringBuilder(); for(int i=0; i {}-{}", Long.toHexString(chl.hashCode() & 0xffffffffL), sim, channel); } Integer sequence = SessionManager.get(nettyChannel, "video-sequence"); if (sequence == null) sequence = 0; // 动态获取数据类型和包类型 int dataTypePos = is2019Protocol ? 19 : 15; packet.seek(dataTypePos); int dataType = (packet.nextByte() >> 4) & 0x0f; int pkType = packet.nextByte() & 0x0f; // 计算数据偏移量 int baseOffset = is2019Protocol ? 32 : 28; int lengthOffset = baseOffset; if (dataType == 0x04) { lengthOffset = baseOffset - 8 - 2 - 2; } else if (dataType == 0x03) { lengthOffset = baseOffset - 4; } int pt = packet.seek(5).nextByte() & 0x7f; int timestampOffset = is2019Protocol ? 20 : 16; if (dataType == 0x00 || dataType == 0x01 || dataType == 0x02) { if (pkType == 0 || pkType == 2) { sequence += 1; SessionManager.set(nettyChannel, "video-sequence", sequence); } long timestamp = packet.seek(timestampOffset).nextLong(); byte[] videoData = packet.seek(lengthOffset + 2).nextBytes(); logger.debug("Publishing video data - size: {}, seq: {}, ts: {}", videoData.length, sequence, timestamp); PublishManager.getInstance().publishVideo(tag, sequence, timestamp, pt, videoData); } else if (dataType == 0x03) { long timestamp = packet.seek(timestampOffset).nextLong(); byte[] audioData = packet.seek(lengthOffset + 2).nextBytes(); logger.debug("Publishing audio data - size: {}, seq: {}, ts: {}", audioData.length, sequence, timestamp); PublishManager.getInstance().publishAudio(tag, sequence, timestamp, pt, audioData); } } @Override protected void channelRead0(ChannelHandlerContext ctx, Packet packet) throws Exception { io.netty.channel.Channel nettyChannel = ctx.channel(); // 1. 协议版本检测 boolean is2019Protocol = detectProtocolVersion(packet); int simLength = is2019Protocol ? 10 : 6; // 2. 读取SIM卡号和通道号 packet.seek(8); StringBuilder simBuilder = new StringBuilder(); for (int i = 0; i < simLength; i++) { simBuilder.append(packet.nextBCD()); } String sim = simBuilder.toString(); int channelPos = is2019Protocol ? 18 : 14; packet.seek(channelPos); int channel = packet.nextByte() & 0xff; String tag = sim + "-" + channel; // 3. 会话管理 if (!SessionManager.contains(nettyChannel, "tag")) { Channel chl = PublishManager.getInstance().open(tag); SessionManager.set(nettyChannel, "tag", tag); logger.info("start publishing: {} -> {}-{} (Protocol: {})", Long.toHexString(chl.hashCode() & 0xffffffffL), sim, channel, is2019Protocol ? "2019" : "2016"); } // 4. 数据处理 Integer sequence = SessionManager.get(nettyChannel, "video-sequence"); if (sequence == null) sequence = 0; int dataTypePos = is2019Protocol ? 19 : 15; packet.seek(dataTypePos); int dataType = (packet.nextByte() >> 4) & 0x0f; int pkType = packet.nextByte() & 0x0f; int baseOffset = is2019Protocol ? 32 : 28; int lengthOffset = baseOffset; if (dataType == 0x04) lengthOffset = baseOffset - 8 - 2 - 2; else if (dataType == 0x03) lengthOffset = baseOffset - 4; int pt = packet.seek(5).nextByte() & 0x7f; int timestampOffset = is2019Protocol ? 20 : 16; if (dataType == 0x00 || dataType == 0x01 || dataType == 0x02) { if (pkType == 0 || pkType == 2) { sequence += 1; SessionManager.set(nettyChannel, "video-sequence", sequence); } long timestamp = packet.seek(timestampOffset).nextLong(); byte[] videoData = packet.seek(lengthOffset + 2).nextBytes(); PublishManager.getInstance().publishVideo(tag, sequence, timestamp, pt, videoData); } else if (dataType == 0x03) { long timestamp = packet.seek(timestampOffset).nextLong(); byte[] audioData = packet.seek(lengthOffset + 2).nextBytes(); PublishManager.getInstance().publishAudio(tag, sequence, timestamp, pt, audioData); } } private boolean detectProtocolVersion(Packet packet) { // 方法1:检查6字节SIM卡号后的填充 packet.seek(8 + 6); // 2016协议SIM卡结束位置 byte b14 = packet.nextByte(); byte b15 = packet.nextByte(); // 如果是2019协议,这里应该是0x00填充 if ((b14 & 0xFF) == 0x00 && (b15 & 0xFF) == 0x00) { return true; } // 方法2:检查数据类型位置的有效性 packet.seek(15); byte dtByte = packet.nextByte(); int dataType = (dtByte >> 4) & 0x0F; int frameType = dtByte & 0x0F; // 如果数据类型或帧类型无效,可能是2019协议 if (dataType > 4 || frameType > 3) { return true; } // 默认返回2016协议 return false; } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); release(ctx.channel()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // super.exceptionCaught(ctx, cause); cause.printStackTrace(); release(ctx.channel()); ctx.close(); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { String tag = SessionManager.get(ctx.channel(), "tag"); logger.info("read timeout: {}",tag); release(ctx.channel()); } } } private void release(io.netty.channel.Channel channel) { String tag = SessionManager.get(channel, "tag"); if (tag != null) { logger.info("close netty channel: {}", tag); PublishManager.getInstance().close(tag); } } }