package cn.org.hentai.jtt1078.server.talk;
|
|
import cn.org.hentai.jtt1078.entity.Analyze;
|
import cn.org.hentai.jtt1078.entity.enums.ProtocolVersion;
|
import cn.org.hentai.jtt1078.util.ByteHolder;
|
import cn.org.hentai.jtt1078.util.ByteUtils;
|
import cn.org.hentai.jtt1078.util.G711Util;
|
import cn.org.hentai.jtt1078.util.Packet;
|
import cn.org.hentai.jtt1078.websocket.MyWebSocketHandler;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.stereotype.Component;
|
|
import javax.sound.sampled.AudioFormat;
|
import javax.sound.sampled.SourceDataLine;
|
import java.io.IOException;
|
import java.time.Instant;
|
import java.time.ZoneOffset;
|
import java.time.ZonedDateTime;
|
import java.time.format.DateTimeFormatter;
|
import java.util.Arrays;
|
import java.util.concurrent.ArrayBlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
|
/**
|
* JTT1078 Protocol Decoder
|
*/
|
@Slf4j
|
@Component
|
public class Jt1078TalkDecoder {
|
private final ByteHolder buffer = new ByteHolder(4096);
|
private static final byte[] HEAD1078 = {0x30, 0x31, 0x63, 0x64};
|
private ProtocolVersion protocolVersion = ProtocolVersion.UNKNOWN;
|
|
public void write(byte[] block) {
|
buffer.write(block);
|
}
|
|
public void write(byte[] block, int startIndex, int length) {
|
byte[] buff = new byte[length];
|
System.arraycopy(block, startIndex, buff, 0, length);
|
write(buff);
|
}
|
|
public Packet decode() {
|
|
if (buffer.size() < 4) return null;
|
|
// Check header
|
if (!Arrays.equals(buffer.array(4), HEAD1078)) {
|
// log.warn("Invalid protocol header, expected:30316364, actual:{}",
|
// ByteUtils.toHexString(buffer.array(4)));
|
buffer.clear();
|
return null;
|
}
|
|
Packet packet;
|
// 第一次自动识别版本
|
if (protocolVersion == ProtocolVersion.UNKNOWN) {
|
packet = autoDetectVersionAndParse();
|
} else {
|
packet = decodePacket(protocolVersion);
|
|
}
|
|
if (packet != null) {
|
// ✅ 统一处理解析与音频
|
Analyze analyze = (protocolVersion == ProtocolVersion.V2013)
|
? parse16(packet.getBytes())
|
: parse19(packet.getBytes());
|
processAudio(analyze);
|
|
}
|
|
return packet;
|
}
|
|
// 自动检测版本并解析
|
private Packet autoDetectVersionAndParse() {
|
// 尝试 2013 版本
|
Packet packet = decodePacket(ProtocolVersion.V2013);
|
if (packet != null) {
|
protocolVersion = ProtocolVersion.V2013;
|
Analyze analyze = parse16(packet.getBytes());
|
System.out.printf("%s-%d device started output%n", analyze.getSim(), analyze.getCh());
|
processAudio(analyze);
|
return packet;
|
}
|
|
// 尝试 2019 版本
|
packet = decodePacket(ProtocolVersion.V2019);
|
if (packet != null) {
|
protocolVersion = ProtocolVersion.V2019;
|
Analyze analyze = parse19(packet.getBytes());
|
System.out.printf("%s-%d device started output%n", analyze.getSim(), analyze.getCh());
|
processAudio(analyze);
|
return packet;
|
}
|
return null;
|
}
|
|
// 根据指定版本解析
|
private Packet decodePacket(ProtocolVersion version) {
|
if (version == ProtocolVersion.UNKNOWN) {
|
return null;
|
}
|
|
int dataTypeIndex = (version == ProtocolVersion.V2013) ? 15 : 19;
|
if (buffer.size() < dataTypeIndex + 1) return null;
|
|
byte dataType = (byte) ((buffer.get(dataTypeIndex) >> 4) & 0x0F);
|
int dataLenIndex;
|
|
switch (dataType) {
|
case 3:
|
dataLenIndex = dataTypeIndex + 9;
|
break;
|
case 4:
|
dataLenIndex = dataTypeIndex + 1;
|
break;
|
default:
|
dataLenIndex = dataTypeIndex + 13;
|
}
|
//log.info("dataType:"+dataType);
|
if (buffer.size() < dataLenIndex + 2) return null;
|
int dataLen = buffer.getShort(dataLenIndex) & 0xFFFF;
|
int packetLength = dataLenIndex + 2 + dataLen;
|
|
if (buffer.size() < packetLength) return null;
|
|
// Check next packet header
|
if (buffer.size() >= packetLength + 4) {
|
byte[] nextHeader = new byte[4];
|
for (int i = 0; i < 4; i++) {
|
nextHeader[i] = buffer.get(packetLength + i);
|
}
|
if (!Arrays.equals(nextHeader, HEAD1078)) {
|
//log.warn("Invalid data type: {}", dataType);
|
buffer.clear();
|
return null;
|
}
|
}
|
|
byte[] packetData = new byte[packetLength];
|
buffer.sliceInto(packetData, packetLength);
|
|
//printPacketInfo(packetData, version);
|
|
return Packet.create(packetData);
|
}
|
|
// 解析16版本数据
|
private Analyze parse16(byte[] data) {
|
Analyze analyze = new Analyze();
|
analyze.setPt((byte) (data[5] & 0x7F));
|
analyze.setSn((short) (((data[6] & 0xFF) << 8) | (data[7] & 0xFF)));
|
|
StringBuilder simBuilder = new StringBuilder();
|
for (int i = 0; i < 6; i++) {
|
simBuilder.append(nextBcd(data, 8 + i));
|
}
|
analyze.setSim(simBuilder.toString());
|
|
analyze.setCh(data[14]);
|
analyze.setDt((byte) ((data[15] >> 4) & 0x0F));
|
analyze.setFi((byte) (data[15] & 0x0F));
|
|
int dataLenIndex;
|
switch (analyze.getDt()) {
|
case 3:
|
analyze.setTimestamp(setTimestamp(data, 16));
|
dataLenIndex = 24;
|
break;
|
case 4:
|
dataLenIndex = 16;
|
break;
|
default:
|
analyze.setTimestamp(setTimestamp(data, 16));
|
analyze.setLastIInterval((short) (((data[24] & 0xFF) << 8) | (data[25] & 0xFF)));
|
analyze.setLastInterval((short) (((data[26] & 0xFF) << 8) | (data[27] & 0xFF)));
|
dataLenIndex = 28;
|
}
|
|
analyze.setDataLen((short) (((data[dataLenIndex] & 0xFF) << 8) | (data[dataLenIndex + 1] & 0xFF)));
|
byte[] payload = new byte[analyze.getDataLen()];
|
System.arraycopy(data, dataLenIndex + 2, payload, 0, payload.length);
|
analyze.setData(payload);
|
|
return analyze;
|
}
|
|
// 解析19版本数据
|
private Analyze parse19(byte[] data) {
|
Analyze analyze = new Analyze();
|
analyze.setPt((byte) (data[5] & 0x7F));
|
analyze.setSn((short) (((data[6] & 0xFF) << 8) | (data[7] & 0xFF)));
|
|
StringBuilder simBuilder = new StringBuilder();
|
for (int i = 0; i < 10; i++) {
|
simBuilder.append(nextBcd(data, 8 + i));
|
}
|
analyze.setSim(simBuilder.toString());
|
|
analyze.setCh(data[18]);
|
analyze.setDt((byte) ((data[19] >> 4) & 0x0F));
|
analyze.setFi((byte) (data[19] & 0x0F));
|
|
int dataLenIndex;
|
switch (analyze.getDt()) {
|
case 3:
|
analyze.setTimestamp(setTimestamp(data, 20));
|
dataLenIndex = 28;
|
break;
|
case 4:
|
dataLenIndex = 20;
|
break;
|
default:
|
analyze.setTimestamp(setTimestamp(data, 20));
|
analyze.setLastIInterval((short) (((data[28] & 0xFF) << 8) | (data[29] & 0xFF)));
|
analyze.setLastInterval((short) (((data[30] & 0xFF) << 8) | (data[31] & 0xFF)));
|
dataLenIndex = 32;
|
}
|
|
analyze.setDataLen((short) (((data[dataLenIndex] & 0xFF) << 8) | (data[dataLenIndex + 1] & 0xFF)));
|
byte[] payload = new byte[analyze.getDataLen()];
|
System.arraycopy(data, dataLenIndex + 2, payload, 0, payload.length);
|
analyze.setData(payload);
|
|
return analyze;
|
}
|
|
private long setTimestamp(byte[] data, int offset) {
|
long timestamp = 0;
|
for (int i = 0; i < 8; i++) {
|
timestamp = (timestamp << 8) | (data[offset + i] & 0xFF);
|
}
|
return timestamp;
|
}
|
|
private String nextBcd(byte[] data, int offset) {
|
byte val = data[offset];
|
int ch1 = (val >> 4) & 0x0F;
|
int ch2 = val & 0x0F;
|
return String.format("%d%d", ch1, ch2);
|
}
|
|
|
private void processAudio(Analyze analyze) {
|
if (analyze.getDt() == 3) {
|
try {
|
processAudioFrame(analyze);
|
} catch (IOException e) {
|
throw new RuntimeException(e);
|
}
|
}
|
}
|
|
// region 本地播放
|
private final int AUDIO_BUFFER_SIZE = 1600; // 100ms的缓冲(8000Hz * 16bit * 1声道 * 0.1秒 /8)
|
private final AudioFormat PCM_FORMAT = new AudioFormat(8000.0f, 16, 1, true, false);
|
private SourceDataLine audioLine;
|
private final BlockingQueue<byte[]> audioQueue = new ArrayBlockingQueue<>(20); // 控制缓冲队列大小
|
private volatile boolean isPlaying = false;
|
private Thread playbackThread;
|
// public Jt1078Decoder() {
|
// 初始化音频输出
|
//initAudioSystem();
|
//}
|
// 初始化音频系统(在构造函数中调用)
|
// private void initAudioSystem() {
|
// try {
|
// DataLine.Info info = new DataLine.Info(SourceDataLine.class, PCM_FORMAT);
|
// audioLine = (SourceDataLine) AudioSystem.getLine(info);
|
//
|
// // 使用更小的缓冲区减少延迟
|
// audioLine.open(PCM_FORMAT, AUDIO_BUFFER_SIZE);
|
// audioLine.start();
|
//
|
// // 启动播放线程
|
// isPlaying = true;
|
// playbackThread = new Thread(this::playbackLoop);
|
// playbackThread.setDaemon(true);
|
// playbackThread.start();
|
//
|
// System.out.println("音频系统初始化完成,缓冲区大小: " + AUDIO_BUFFER_SIZE + " bytes");
|
// } catch (LineUnavailableException e) {
|
// System.err.println("音频设备初始化失败: " + e.getMessage());
|
// }
|
// }
|
|
// 播放线程主循环
|
// private void playbackLoop() {
|
// byte[] silentFrame = new byte[320]; // 20ms静音帧
|
// Arrays.fill(silentFrame, (byte)0);
|
//
|
// while (isPlaying) {
|
// try {
|
// byte[] audioData = audioQueue.poll(50, TimeUnit.MILLISECONDS);
|
//
|
// if (audioData != null) {
|
// audioLine.write(audioData, 0, audioData.length);
|
// } else {
|
// Thread.sleep(5);
|
// }
|
// } catch (InterruptedException e) {
|
// Thread.currentThread().interrupt();
|
// }
|
// }
|
// }
|
// endregion
|
// 改进的音频数据处理方法
|
private void processAudioFrame(Analyze analyze) throws IOException {
|
if (analyze.getDt() != 3) return; // 非音频帧
|
|
byte[] rawData = analyze.getData();
|
if (rawData == null || rawData.length <= 32) return; // 跳过无效帧
|
|
// 1. 提取有效负载(跳过JT1078头部)
|
int payloadOffset = 32; // 根据实际协议调整
|
byte[] g711aPayload = Arrays.copyOfRange(rawData, payloadOffset, rawData.length);
|
|
// 发送给对应 sim 的前端
|
String sim = analyze.getSim(); // 假设 Analyze 对象有 sim 字段
|
|
// 2. 快速解码
|
byte[] pcmData = G711Util.decodeG711AFast(g711aPayload);
|
|
// 4. 发送给前端
|
MyWebSocketHandler.sendAudioToSim(sim, pcmData);
|
|
// // 3. 非阻塞提交到播放队列
|
// if (!audioQueue.offer(pcmData)) {
|
// audioQueue.poll(); // 丢弃最旧帧(而不是 flush 输出流)
|
// audioQueue.offer(pcmData); // 再次尝试加入新帧
|
// }
|
}
|
|
// 关闭时释放资源
|
public void close() {
|
isPlaying = false;
|
if (playbackThread != null) {
|
playbackThread.interrupt();
|
}
|
if (audioLine != null) {
|
audioLine.drain();
|
audioLine.close();
|
}
|
}
|
|
private void printPacketInfo(byte[] packet, ProtocolVersion version) {
|
System.out.println("Raw Packet (Hex):\n" + ByteUtils.toHexString(packet));
|
|
System.out.println("\n=== JTT1078 " + ((version == ProtocolVersion.V2013) ? "2013" : "2019") + " Protocol " +
|
"Packet ===");
|
System.out.println("Header: " + ByteUtils.toHexString(packet, 0, 4));
|
|
Analyze analyze = (version == ProtocolVersion.V2013) ? parse16(packet) : parse19(packet);
|
System.out.println("Payload Type: " + analyze.getPt());
|
System.out.println("Sequence Number: " + analyze.getSn());
|
System.out.println("SIM: " + analyze.getSim());
|
System.out.println("Channel: " + analyze.getCh());
|
String dataType;
|
switch (analyze.getDt()) {
|
case 0:
|
dataType = "Video I Frame";
|
break;
|
case 1:
|
dataType = "Video P Frame";
|
break;
|
case 2:
|
dataType = "Video B Frame";
|
break;
|
case 3:
|
dataType = "Audio Frame";
|
break;
|
case 4:
|
dataType = "Transparent Data";
|
break;
|
default:
|
dataType = "Unknown";
|
break;
|
}
|
System.out.println("Data Type: " + dataType);
|
String FrameInfo;
|
switch (analyze.getFi()) {
|
case 0:
|
FrameInfo = "Atomic Packet";
|
break;
|
case 1:
|
FrameInfo = "First Packet";
|
break;
|
case 2:
|
FrameInfo = "Last Packet";
|
break;
|
case 3:
|
FrameInfo = "Middle Packet";
|
break;
|
default:
|
FrameInfo = "Unknown (" + analyze.getFi() + ")";
|
break;
|
}
|
System.out.println("Frame Info: " + FrameInfo);
|
|
if (analyze.getTimestamp() != 0) {
|
Instant instant = Instant.ofEpochMilli(analyze.getTimestamp());
|
ZonedDateTime beijingTime = instant.atZone(ZoneOffset.ofHours(8));
|
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
System.out.println("Timestamp: " + formatter.format(beijingTime));
|
}
|
|
if (analyze.getLastIInterval() != 0) {
|
System.out.println("Last I Frame Interval: " + analyze.getLastIInterval());
|
}
|
if (analyze.getLastInterval() != 0) {
|
System.out.println("Last Frame Interval: " + analyze.getLastInterval());
|
}
|
|
System.out.println("Data Length: " + analyze.getDataLen());
|
System.out.println("=================================\n");
|
|
}
|
}
|