package cn.org.hentai.jtt1078.server;
|
|
import cn.org.hentai.jtt1078.entity.Media;
|
import cn.org.hentai.jtt1078.entity.MediaEncoding;
|
import cn.org.hentai.jtt1078.publisher.Channel;
|
import cn.org.hentai.jtt1078.publisher.PublishManager;
|
import cn.org.hentai.jtt1078.entity.Audio;
|
import cn.org.hentai.jtt1078.util.Packet;
|
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.SimpleChannelInboundHandler;
|
import io.netty.util.Attribute;
|
import io.netty.util.AttributeKey;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import java.util.Iterator;
|
import io.netty.handler.timeout.IdleState;
|
import io.netty.handler.timeout.IdleStateEvent;
|
|
/**
|
* Created by matrixy on 2019/4/9.
|
*/
|
public class Jtt1078Handler extends SimpleChannelInboundHandler<Packet>
|
{
|
static Logger logger = LoggerFactory.getLogger(Jtt1078Handler.class);
|
private static final AttributeKey<Session> SESSION_KEY = AttributeKey.valueOf("session-key");
|
|
// @Override
|
protected void channelRead011(ChannelHandlerContext ctx, Packet packet) throws Exception {
|
io.netty.channel.Channel nettyChannel = ctx.channel();
|
|
// 检查协议版本
|
packet.seek(14);
|
byte b14 = packet.nextByte();
|
byte b15 = packet.nextByte();
|
boolean is2019Protocol = (b14 & 0xFF) == 0x00 && (b15 & 0xFF) == 0x00;
|
int simLength = is2019Protocol ? 10 : 6;
|
|
// 读取SIM卡和通道号
|
packet.seek(8);
|
StringBuilder simBuilder = new StringBuilder();
|
for(int i=0; i<simLength; i++) {
|
simBuilder.append(packet.nextBCD());
|
}
|
String sim = simBuilder.toString();
|
int channel = packet.nextByte() & 0xff;
|
String tag = sim + "-" + channel;
|
|
if (!SessionManager.contains(nettyChannel, "tag")) {
|
Channel chl = PublishManager.getInstance().open(tag);
|
SessionManager.set(nettyChannel, "tag", tag);
|
logger.info("start publishing: {} -> {}-{}", Long.toHexString(chl.hashCode() & 0xffffffffL), sim, channel);
|
}
|
|
Integer sequence = SessionManager.get(nettyChannel, "video-sequence");
|
if (sequence == null) sequence = 0;
|
|
// 动态获取数据类型和包类型
|
int dataTypePos = is2019Protocol ? 19 : 15;
|
packet.seek(dataTypePos);
|
int dataType = (packet.nextByte() >> 4) & 0x0f;
|
int pkType = packet.nextByte() & 0x0f;
|
|
// 计算数据偏移量
|
int baseOffset = is2019Protocol ? 32 : 28;
|
int lengthOffset = baseOffset;
|
if (dataType == 0x04) {
|
lengthOffset = baseOffset - 8 - 2 - 2;
|
} else if (dataType == 0x03) {
|
lengthOffset = baseOffset - 4;
|
}
|
|
int pt = packet.seek(5).nextByte() & 0x7f;
|
int timestampOffset = is2019Protocol ? 20 : 16;
|
|
if (dataType == 0x00 || dataType == 0x01 || dataType == 0x02) {
|
if (pkType == 0 || pkType == 2) {
|
sequence += 1;
|
SessionManager.set(nettyChannel, "video-sequence", sequence);
|
}
|
long timestamp = packet.seek(timestampOffset).nextLong();
|
byte[] videoData = packet.seek(lengthOffset + 2).nextBytes();
|
logger.debug("Publishing video data - size: {}, seq: {}, ts: {}", videoData.length, sequence, timestamp);
|
PublishManager.getInstance().publishVideo(tag, sequence, timestamp, pt, videoData);
|
}
|
else if (dataType == 0x03) {
|
long timestamp = packet.seek(timestampOffset).nextLong();
|
byte[] audioData = packet.seek(lengthOffset + 2).nextBytes();
|
logger.debug("Publishing audio data - size: {}, seq: {}, ts: {}", audioData.length, sequence, timestamp);
|
PublishManager.getInstance().publishAudio(tag, sequence, timestamp, pt, audioData);
|
}
|
}
|
@Override
|
protected void channelRead0(ChannelHandlerContext ctx, Packet packet) throws Exception {
|
io.netty.channel.Channel nettyChannel = ctx.channel();
|
|
// 1. 协议版本检测
|
boolean is2019Protocol = detectProtocolVersion(packet);
|
int simLength = is2019Protocol ? 10 : 6;
|
|
// 2. 读取SIM卡号和通道号
|
packet.seek(8);
|
StringBuilder simBuilder = new StringBuilder();
|
for (int i = 0; i < simLength; i++) {
|
simBuilder.append(packet.nextBCD());
|
}
|
String sim = simBuilder.toString();
|
|
int channelPos = is2019Protocol ? 18 : 14;
|
packet.seek(channelPos);
|
int channel = packet.nextByte() & 0xff;
|
String tag = sim + "-" + channel;
|
|
// 3. 会话管理
|
if (!SessionManager.contains(nettyChannel, "tag")) {
|
Channel chl = PublishManager.getInstance().open(tag);
|
SessionManager.set(nettyChannel, "tag", tag);
|
logger.info("start publishing: {} -> {}-{} (Protocol: {})",
|
Long.toHexString(chl.hashCode() & 0xffffffffL),
|
sim, channel,
|
is2019Protocol ? "2019" : "2016");
|
}
|
|
// 4. 数据处理
|
Integer sequence = SessionManager.get(nettyChannel, "video-sequence");
|
if (sequence == null) sequence = 0;
|
|
int dataTypePos = is2019Protocol ? 19 : 15;
|
packet.seek(dataTypePos);
|
int dataType = (packet.nextByte() >> 4) & 0x0f;
|
int pkType = packet.nextByte() & 0x0f;
|
|
int baseOffset = is2019Protocol ? 32 : 28;
|
int lengthOffset = baseOffset;
|
if (dataType == 0x04) lengthOffset = baseOffset - 8 - 2 - 2;
|
else if (dataType == 0x03) lengthOffset = baseOffset - 4;
|
|
int pt = packet.seek(5).nextByte() & 0x7f;
|
int timestampOffset = is2019Protocol ? 20 : 16;
|
|
if (dataType == 0x00 || dataType == 0x01 || dataType == 0x02) {
|
if (pkType == 0 || pkType == 2) {
|
sequence += 1;
|
SessionManager.set(nettyChannel, "video-sequence", sequence);
|
}
|
long timestamp = packet.seek(timestampOffset).nextLong();
|
byte[] videoData = packet.seek(lengthOffset + 2).nextBytes();
|
PublishManager.getInstance().publishVideo(tag, sequence, timestamp, pt, videoData);
|
}
|
else if (dataType == 0x03) {
|
long timestamp = packet.seek(timestampOffset).nextLong();
|
byte[] audioData = packet.seek(lengthOffset + 2).nextBytes();
|
PublishManager.getInstance().publishAudio(tag, sequence, timestamp, pt, audioData);
|
}
|
}
|
|
private boolean detectProtocolVersion(Packet packet) {
|
// 方法1:检查6字节SIM卡号后的填充
|
packet.seek(8 + 6); // 2016协议SIM卡结束位置
|
byte b14 = packet.nextByte();
|
byte b15 = packet.nextByte();
|
|
// 如果是2019协议,这里应该是0x00填充
|
if ((b14 & 0xFF) == 0x00 && (b15 & 0xFF) == 0x00) {
|
return true;
|
}
|
|
// 方法2:检查数据类型位置的有效性
|
packet.seek(15);
|
byte dtByte = packet.nextByte();
|
int dataType = (dtByte >> 4) & 0x0F;
|
int frameType = dtByte & 0x0F;
|
|
// 如果数据类型或帧类型无效,可能是2019协议
|
if (dataType > 4 || frameType > 3) {
|
return true;
|
}
|
|
// 默认返回2016协议
|
return false;
|
}
|
@Override
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception
|
{
|
super.channelInactive(ctx);
|
release(ctx.channel());
|
}
|
|
@Override
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
|
{
|
// super.exceptionCaught(ctx, cause);
|
cause.printStackTrace();
|
release(ctx.channel());
|
ctx.close();
|
}
|
|
@Override
|
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
|
IdleStateEvent event = (IdleStateEvent) evt;
|
if (event.state() == IdleState.READER_IDLE) {
|
String tag = SessionManager.get(ctx.channel(), "tag");
|
logger.info("read timeout: {}",tag);
|
release(ctx.channel());
|
}
|
}
|
}
|
|
private void release(io.netty.channel.Channel channel)
|
{
|
String tag = SessionManager.get(channel, "tag");
|
if (tag != null)
|
{
|
logger.info("close netty channel: {}", tag);
|
PublishManager.getInstance().close(tag);
|
}
|
}
|
}
|