From 39d4048dc6fd5a138bd1128c06bccca08fbc72f0 Mon Sep 17 00:00:00 2001 From: 18045010223 <zjbassadmin@> Date: 星期二, 05 八月 2025 08:49:09 +0800 Subject: [PATCH] 解决2019协议设备播放问题 --- src/main/java/cn/org/hentai/jtt1078/codec/ADPCMCodec.java | 4 src/main/java/cn/org/hentai/jtt1078/codec/G711UCodec.java | 2 .classpath | 40 src/main/java/cn/org/hentai/jtt1078/server/video/Jtt1078Handler.java | 150 +++ .settings/org.eclipse.core.resources.prefs | 4 src/main/java/cn/org/hentai/jtt1078/entity/Jt1078Message.java | 64 + src/main/resources/multimedia.html | 9 src/main/java/cn/org/hentai/jtt1078/app/HttpServer.java | 65 + src/main/java/cn/org/hentai/jtt1078/Jtt1078Application.java | 12 src/main/java/cn/org/hentai/jtt1078/server/talk/Jt1078TalkDecoder.java | 422 +++++++++ src/main/java/cn/org/hentai/jtt1078/server/video/Jtt1078MessageDecoder.java | 38 src/main/java/cn/org/hentai/jtt1078/util/AudioConverter.java | 70 + src/main/resources/video.html | 2 src/main/java/cn/org/hentai/jtt1078/app/TalkServer.java | 57 + src/main/java/cn/org/hentai/jtt1078/util/AudioFileWriter.java | 30 .project | 23 README.md | 221 ---- src/main/java/cn/org/hentai/jtt1078/entity/Analyze.java | 18 src/main/java/cn/org/hentai/jtt1078/test/VideoPushTest.java | 5 src/main/java/cn/org/hentai/jtt1078/entity/AudioSendData.java | 33 src/main/java/cn/org/hentai/jtt1078/publisher/Channel.java | 2 src/main/java/cn/org/hentai/jtt1078/app/VideoServer.java | 61 + src/main/java/cn/org/hentai/jtt1078/server/backup/Jt1078MessageDecoder.java | 145 +++ src/main/java/cn/org/hentai/jtt1078/test/AudioTest.java | 3 src/main/java/cn/org/hentai/jtt1078/websocket/CustomHandshakeInterceptor.java | 30 src/main/java/cn/org/hentai/jtt1078/entity/enums/ProtocolVersion.java | 7 src/main/java/cn/org/hentai/jtt1078/server/backup/Jt1078Handler.java | 76 + pom.xml | 43 src/main/java/cn/org/hentai/jtt1078/test/RTPGenerate.java | 5 src/main/java/cn/org/hentai/jtt1078/websocket/MyWebSocketHandler.java | 84 + .settings/org.eclipse.m2e.core.prefs | 4 src/main/java/cn/org/hentai/jtt1078/server/video/Jtt1078Decoder.java | 299 ++++++ src/main/java/cn/org/hentai/jtt1078/server/audio/Jt1078AudioEncoder.java | 121 ++ doc/JTT 1078-2016.pdf | 0 src/main/java/cn/org/hentai/jtt1078/server/talk/Jt1078TalkMessageDecoder.java | 35 src/main/java/cn/org/hentai/jtt1078/util/TimeUtils.java | 46 + src/main/java/cn/org/hentai/jtt1078/websocket/WebSocketConfig.java | 26 src/main/java/cn/org/hentai/jtt1078/server/audio/Jt1078AudioSenderService.java | 76 + src/main/java/cn/org/hentai/jtt1078/test/VideoServer.java | 2 src/main/resources/audio.html | 3 src/main/java/cn/org/hentai/jtt1078/util/ByteUtils.java | 103 ++ src/main/java/cn/org/hentai/jtt1078/server/talk/Jt1078TalkHandler.java | 154 +++ src/main/java/cn/org/hentai/jtt1078/app/ServerApp.java | 34 /dev/null | 19 .settings/org.eclipse.jdt.core.prefs | 8 src/main/java/cn/org/hentai/jtt1078/util/ByteHolder.java | 8 src/main/java/cn/org/hentai/jtt1078/util/Packet.java | 2 src/main/java/cn/org/hentai/jtt1078/util/G711Util.java | 33 src/main/resources/app.properties | 12 src/main/java/cn/org/hentai/jtt1078/test/UnPack.java | 2 src/main/resources/application.yml | 10 51 files changed, 2,437 insertions(+), 285 deletions(-) diff --git a/.classpath b/.classpath new file mode 100644 index 0000000..f7e4a1d --- /dev/null +++ b/.classpath @@ -0,0 +1,40 @@ +<?xml version="1.0" encoding="UTF-8"?> +<classpath> + <classpathentry kind="src" output="target/classes" path="src/main/java"> + <attributes> + <attribute name="optional" value="true"/> + <attribute name="maven.pomderived" value="true"/> + </attributes> + </classpathentry> + <classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources"> + <attributes> + <attribute name="maven.pomderived" value="true"/> + <attribute name="optional" value="true"/> + </attributes> + </classpathentry> + <classpathentry kind="src" output="target/test-classes" path="src/test/java"> + <attributes> + <attribute name="optional" value="true"/> + <attribute name="maven.pomderived" value="true"/> + <attribute name="test" value="true"/> + </attributes> + </classpathentry> + <classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources"> + <attributes> + <attribute name="maven.pomderived" value="true"/> + <attribute name="test" value="true"/> + <attribute name="optional" value="true"/> + </attributes> + </classpathentry> + <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8"> + <attributes> + <attribute name="maven.pomderived" value="true"/> + </attributes> + </classpathentry> + <classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER"> + <attributes> + <attribute name="maven.pomderived" value="true"/> + </attributes> + </classpathentry> + <classpathentry kind="output" path="target/classes"/> +</classpath> diff --git a/.project b/.project new file mode 100644 index 0000000..99d0611 --- /dev/null +++ b/.project @@ -0,0 +1,23 @@ +<?xml version="1.0" encoding="UTF-8"?> +<projectDescription> + <name>jtt1078-video-server</name> + <comment></comment> + <projects> + </projects> + <buildSpec> + <buildCommand> + <name>org.eclipse.jdt.core.javabuilder</name> + <arguments> + </arguments> + </buildCommand> + <buildCommand> + <name>org.eclipse.m2e.core.maven2Builder</name> + <arguments> + </arguments> + </buildCommand> + </buildSpec> + <natures> + <nature>org.eclipse.jdt.core.javanature</nature> + <nature>org.eclipse.m2e.core.maven2Nature</nature> + </natures> +</projectDescription> diff --git a/.settings/org.eclipse.core.resources.prefs b/.settings/org.eclipse.core.resources.prefs new file mode 100644 index 0000000..abdea9a --- /dev/null +++ b/.settings/org.eclipse.core.resources.prefs @@ -0,0 +1,4 @@ +eclipse.preferences.version=1 +encoding//src/main/java=UTF-8 +encoding//src/main/resources=UTF-8 +encoding/<project>=UTF-8 diff --git a/.settings/org.eclipse.jdt.core.prefs b/.settings/org.eclipse.jdt.core.prefs new file mode 100644 index 0000000..2f5cc74 --- /dev/null +++ b/.settings/org.eclipse.jdt.core.prefs @@ -0,0 +1,8 @@ +eclipse.preferences.version=1 +org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8 +org.eclipse.jdt.core.compiler.compliance=1.8 +org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled +org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning +org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore +org.eclipse.jdt.core.compiler.release=disabled +org.eclipse.jdt.core.compiler.source=1.8 diff --git a/.settings/org.eclipse.m2e.core.prefs b/.settings/org.eclipse.m2e.core.prefs new file mode 100644 index 0000000..f897a7f --- /dev/null +++ b/.settings/org.eclipse.m2e.core.prefs @@ -0,0 +1,4 @@ +activeProfiles= +eclipse.preferences.version=1 +resolveWorkspaceProjects=true +version=1 diff --git a/README.md b/README.md index 778f66c..5bbe3b9 100644 --- a/README.md +++ b/README.md @@ -1,216 +1,15 @@ -## 鐩綍 -<ol> - <li><a href="#jtt1078-video-server">绠�浠嬭鏄�</a></li> - <li><a href="#鍒嗘敮璇存槑">鍒嗘敮璇存槑</a></li> - <li><a href="#椤圭洰璇存槑">椤圭洰璇存槑</a></li> - <li><a href="#鍑嗗宸ュ叿">鍑嗗宸ュ叿</a></li> - <li><a href="#娴嬭瘯姝ラ">娴嬭瘯姝ラ</a></li> - <li><a href="#娴嬭瘯鐜">娴嬭瘯鐜</a></li> - <li><a href="#TODO">TODO</a></li> - <li><a href="#鑷磋阿">鑷磋阿</a></li> - <li><a href="#鎺ㄨ崘缇ゅ弸椤圭洰">鎺ㄨ崘缇ゅ弸椤圭洰</a></li> - <li><a href="#浜ゆ祦璁ㄨ">浜ゆ祦璁ㄨ</a></li> -</ol> - -[//]: # (<div align="center"><img src="./doc/1078.png" /></div>) - -<hr /> +# 鐩綍 ## jtt1078-video-server -鍩轰簬JT/T 1078鍗忚瀹炵幇鐨勮棰戣浆鎾湇鍔″櫒锛屽綋杞︽満鏈嶅姟鍣ㄧ涓诲姩涓嬪彂**闊宠棰戝疄鏃朵紶杈撴帶鍒�**娑堟伅锛�0x9101锛夊悗锛岃溅杞界粓绔繛鎺ュ埌姝ゆ湇鍔″櫒鍚庯紝鍙戦�佹寚瀹氭憚鍍忓ご鎵�閲囬泦鐨勮棰戞祦锛屾椤圭洰鏈嶅姟鍣ㄥ畬鎴愰煶瑙嗛鏁版嵁鎺ユ敹骞惰浆鐮侊紝瀹屾垚杞挱鐨勬祦绋嬶紝鎻愪緵鍚勫钩鍙扮殑鎾斁鏀拺銆� +鍏煎 2019 鍗忚 -鍚屾椂锛屾湰椤圭洰鍦ㄩ厤缃� **ffmpeg璺緞** 鍙� **rtmp url** 鍚庯紝灏嗗悓鏃惰緭鍑轰竴璺埌 **RTMP** 鏈嶅姟鍣ㄤ笂鍘伙紝涓虹Щ鍔ㄧ鎾斁鎻愪緵闊宠棰戞敮鎸侊紙娉ㄦ剰锛岀敱浜庢梺璺殑RTMP娴佹槸閫氳繃ffmpeg瀛愯繘绋嬪疄鐜帮紝骞朵笖鏈夐煶棰戣浆鐮佺殑杩囩▼锛屾墍浠ユ�ц兘灏嗘湁寰堝ぇ鐨勪笅闄嶏級銆� +### 璇存槑 +- 808 鐨� 2013 鍗忚瀵瑰簲 1078 鐨� 2016 鍗忚 +- 808 鐨� 2019 鍗忚瀵瑰簲 1078 鐨� 2019 鍗忚 +- 铏界劧鎵句笉鍒� 1078 鐨� 2019 鍗忚锛屼絾鏄熀浜� 2016 鍗忚锛屼富瑕佸尯鍒槸 SIM 鍙蜂綅鏁颁笉鍚岋細 -> 闈炲父鎰熻阿 **瀛ゅ嘲璧忔湀/hx锛圼github/jelycom](https://github.com/jelycom)锛�** 鎻愪緵鐨刴p3闊抽鏀寔銆� +| 鍗忚鐗堟湰 | SIM 浣嶆暟 | +|--------|-----------| +| 2016 | 6 浣� | +| 2019 | 10 浣� | -## 鍒嗘敮璇存槑 -鍘熼」鐩湁4涓垎鏀笉鍚岀殑瀹炵幇鏂瑰紡锛岀幇灏嗗叾瀹冨垎鏀叏閮ㄥ垹闄わ紝宸茬粡鐢ㄤ笉涓婁簡銆� -閰嶇疆浜唂fmpeg鍜宺tmp锛屽彲浠ユ兂鍔炴硶鍚屾椂杈撳嚭鍒版瘮濡侶LS绛夈�� - -> 鏈夊叾瀹冭瑷�鐨勫紑鍙戣�咃紝鍙互鍙傝�冩垜鐨勨�淸JTT/1078闊宠棰戜紶杈撳崗璁紑鍙戞寚鍗梋(https://www.hentai.org.cn/article?id=8)鈥濓紝鎴戞墍鐭ラ亾鐨勫畼鏂规枃妗i噷鐨勯敊璇垨鏄己闄蜂互鍙婂潙锛屾垜鍏ㄩ儴鍐欎簡涓嬫潵锛屽笇鏈涘浣犳湁甯姪銆� - -### 椤圭洰璇存槑 -鏈」鐩帴鏀舵潵鑷簬杞﹁浇缁堢鍙戣繃鏉ョ殑闊宠棰戞暟鎹紝瑙嗛鐩存帴灏佽涓篎LV TAG锛岄煶棰戝畬鎴怗.711A銆丟.711U銆丄DPCMA銆丟726鍒癙CM鐨勮浆鐮侊紝骞朵娇鐢∕P3鍘嬬缉鍚庡啀灏佽涓篎LV TAG銆� - -#### 瑙嗛缂栫爜鏀寔 -鐩墠鍑犱箮鎵�鏈夌殑缁堢瑙嗛锛岄粯璁ょ殑瑙嗛缂栫爜閮芥槸h264锛屾墦鍖呮垚flv涔熸槸闈炲父绠�鍗曠殑锛屾湁涓埆鍘傚浣跨敤avs锛屼絾鏄垜娌℃湁纰板埌杩囥�傛湰椤圭洰鐩墠涔熷彧鏀寔h264缂栫爜鐨勮棰戙�� - -#### 闊抽缂栫爜鏀寔 -|闊抽缂栫爜|鏀寔|澶囨敞| -|---|---|---| -|G.711A|Y|鏀寔| -|G.711U|Y|鏀寔| -|ADPCMA|Y|鏀寔| -|G.726|Y|鏀寔| - -闊抽缂栫爜澶锛屼篃娌¢偅涔堝璁惧鍙互娴嬭瘯鐨勶紝姣旇緝甯歌鐨勫氨G.711A鍜孉DPCMA杩欎袱绉嶏紝鏈▼搴忓浜庝笉鏀寔鐨勯煶棰戯紝灏嗕綔 **闈欓煶澶勭悊** 銆� - -#### 闊抽缂栫爜杞爜鎵╁睍瀹炵幇 -缁ф壙骞跺疄鐜癭AudioCodec`绫荤殑鎶借薄鏂规硶锛屽畬鎴愪换鎰忛煶棰戝埌PCM缂栫爜鐨勮浆鐮佽繃绋嬶紝骞朵笖琛ュ厖`AudioCodec.getCodec()`宸ュ巶鏂规硶鍗冲彲銆俙AudioCodec`鎶借薄绫诲師鍨嬪涓嬶細 -```java -public abstract class AudioCodec -{ - // 杞崲鑷砅CM - public abstract byte[] toPCM(byte[] data); - // 鐢盤CM杞负褰撳墠缂栫爜锛屽彲浠ョ暀绌猴紝鍙嶆鍙堟病鏈夎皟鐢� - public abstract byte[] fromPCM(byte[] data); -} -``` - -### 鍑嗗宸ュ叿 -椤圭洰閲屽噯澶囦簡涓�涓祴璇曠▼搴忥紙`src/main/java/cn.org.hentai.jtt1078.test.VideoPushTest.java`锛夛紝浠ュ強涓�涓暟鎹枃浠讹紙`src/main/resources/tcpdump.bin`锛夛紝鏁版嵁鏂囦欢鏄�氳繃宸ュ叿閲囬泦鐨勪竴娈靛嚑鍒嗛挓鏃堕暱鐨勮溅杞界粓绔彂閫佷笂鏉ョ殑鍘熷娑堟伅鍖咃紝娴嬭瘯绋嬪簭鍙互鎸佺画涓嶆柇鐨勩�佹參鎱㈢殑鍙戦�佹暟鎹枃浠堕噷鐨勫唴瀹癸紝鐢ㄦ潵妯℃嫙杞﹁浇缁堢鍙戦�佽棰戞祦鐨勮繃绋嬨�� - -鍙﹀锛屾柊澧炰簡 `cn.org.hentai.jtt1078.test.RTPGenerate` 绫伙紝鐢ㄤ簬璇诲彇bin鏂囦欢锛屽苟涓斾慨鏀筍IM鍗″彿鍜岄�氶亾鍙凤紝鍒涘缓澶ч噺鏁版嵁鏂囦欢浠ヤ究浜庡帇鍔涙祴璇曘�� - -### 娴嬭瘯姝ラ -1. 閰嶇疆濂芥湇鍔″櫒绔紝淇敼`app.properties`閲岀殑閰嶇疆椤广�� -2. 鐩存帴鍦↖DE閲岃繍琛宍cn.org.hentai.jtt1078.app.VideoServerApp`锛屾垨瀵归」鐩繘琛屾墦鍖咃紝鎵ц`mvn package`锛屾墽琛宍java -jar jtt1078-video-server-1.0-SNAPSHOT.jar`鏉ュ惎鍔ㄦ湇鍔″櫒绔�� -3. 杩愯`VideoPushTest.java`锛屽紑濮嬫ā鎷熻溅杞界粓绔殑瑙嗛鎺ㄩ�併�� -4. 寮�濮嬪悗锛屾帶鍒跺彴閲屼細杈撳嚭鏄剧ず**start publishing: 013800138999-2**鐨勫瓧鏍� -5. 鎵撳紑娴忚鍣紝杈撳叆 **http://localhost:3333/test/multimedia#013800138999-2** 鍚庡洖杞� -6. 鐐瑰嚮缃戦〉涓婄殑**play video**锛屽紑濮嬫挱鏀捐棰� - -### 娴嬭瘯鐜 -鎴戝湪鎴戣嚜宸辩殑VPS涓婃惌寤轰簡涓�涓�1078闊宠棰戠幆澧冿紝瀹屽叏浣跨敤浜�**flv**鍒嗘敮涓婄殑浠g爜鏉ュ垱寤猴紝鍚勪綅鍙互璁╃粓绔皢闊宠棰戝彂閫佸埌姝ゆ湇鍔″櫒鎴栨槸浣跨敤**netcat**绛夌綉缁滃伐鍏峰彂閫佹ā鎷熸暟鎹潵浠跨湡缁堢锛屾潵浣撻獙闊宠棰戠殑鏁堟灉銆備笅闈㈡垜浠涓�涓嬮�氳繃**netcat**鏉ユā鎷熺粓绔殑鏂规硶锛� - -|鏍囬|璇存槑| -|---|---| -|1078闊宠棰戞湇鍔″櫒|185.251.248.4:10780| -|瀹炴椂闊宠棰戞挱鏀鹃〉闈http://1078.hentai.org.cn/test/multimedia#SIM-CHANNEL| - -1. 棣栧厛锛屾湰椤圭洰鐨� **/src/main/resources/** 涓嬬殑 **tcpdump.bin** 鍗充负鎴戞姄鍖呭瓨涓嬫潵鐨勭粓绔煶瑙嗛鏁版嵁鏂囦欢锛岄�氳繃`cat tcpdump.bin | pv -L 40k -q | nc 185.251.248.4 10780`鍗冲彲浠ユ瘡绉�40kBPS鐨勯�熷害锛屽悜鏈嶅姟鍣ㄧ鎸佺画鐨勫彂閫佹暟鎹�� -2. 鍦ㄦ祻瑙堝櫒閲屾墦寮�**http://1078.hentai.org.cn/test/multimedia#SIM-CHANNEL** 锛堟敞鎰忔浛鎹㈡帀鍚庨潰鐨凷IM鍜孋HANNEL锛屽嵆缁堢鐨凷IM鍗″彿锛屼笉瓒�12浣嶅墠闈㈣ˉ0锛孋HANNEL鍗充负閫氶亾鍙凤級锛岀劧鍚庣偣鍑荤綉椤典笂鐨�**play video**鍗冲彲銆� - -> 鐢变簬鎴戠殑鏈嶅姟鍣↖P闅忔椂鍙兘浼氬彂鐢熷彉鍖栵紝寤鸿鍦ㄥ皾璇曡繛鎺ユ祴璇曟湇鍔″櫒鍓嶏紝鍏堥�氳繃`ping www.hentai.org.cn`鏉ョ‘瀹氭渶鏂扮殑IP銆� - -### 椤圭洰鏂囦欢璇存槑 -``` - - -鈹溾攢鈹� doc -鈹偮犅� 鈹溾攢鈹� 1078.png锛堝浘鏍囷級 -鈹偮犅� 鈹斺攢鈹� ffmpeg.png -鈹溾攢鈹� LICENSE锛堝紑婧愬崗璁級 -鈹溾攢鈹� pom.xml -鈹溾攢鈹� README.md锛堥」鐩鏄庯級 -鈹溾攢鈹� src -鈹偮犅� 鈹斺攢鈹� main -鈹偮犅� 鈹溾攢鈹� java -鈹偮犅� 鈹偮犅� 鈹斺攢鈹� cn -鈹偮犅� 鈹偮犅� 鈹斺攢鈹� org -鈹偮犅� 鈹偮犅� 鈹斺攢鈹� hentai -鈹偮犅� 鈹偮犅� 鈹斺攢鈹� jtt1078 -鈹偮犅� 鈹偮犅� 鈹溾攢鈹� app -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹斺攢鈹� VideoServerApp.java锛堜富鍏ュ彛绋嬪簭锛� -鈹偮犅� 鈹偮犅� 鈹溾攢鈹� codec -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� ADPCMCodec.java锛圓DPCM缂栬В鐮佸櫒锛� -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� AudioCodec.java锛堥煶棰戠紪瑙g爜鎶借薄鐖剁被锛� -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� G711Codec.java锛圙711A/alaw缂栬В鐮佸櫒锛� -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� G711UCodec.java锛圙711U/ulaw缂栬В鐮佸櫒锛� -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� g726锛圙726缂栬В鐮佸疄鐜帮級 -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� G726_16.java -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� G726_24.java -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� G726_32.java -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� G726_40.java -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� G726.java -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹斺攢鈹� G726State.java -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� G726Codec.java锛圙726缂栬В鐮佸櫒锛� -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� MP3Encoder.java锛圥CM鍒癕P3鍘嬬缉缂栫爜鍣級 -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹斺攢鈹� SilenceCodec.java锛堥潤闊冲寲瑙g爜鍣級 -鈹偮犅� 鈹偮犅� 鈹溾攢鈹� entity -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� Audio.java -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� MediaEncoding.java -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� Media.java -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹斺攢鈹� Video.java -鈹偮犅� 鈹偮犅� 鈹溾攢鈹� flv -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� AudioTag.java -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� FlvAudioTagEncoder.java -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� FlvEncoder.java锛圚264鍒癋LV灏佽缂栫爜鍣級 -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹斺攢鈹� FlvTag.java -鈹偮犅� 鈹偮犅� 鈹溾攢鈹� http锛堝唴缃瓾TTP鏈嶅姟锛屾彁渚汬TTP-CHUNKED浼犺緭鏀寔锛� -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� GeneralResponseWriter.java -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹斺攢鈹� NettyHttpServerHandler.java -鈹偮犅� 鈹偮犅� 鈹溾攢鈹� publisher -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� Channel.java锛堜竴涓�氶亾涓�涓狢hannel瀹炰緥锛孲ubscriber璁㈤槄Channel涓婄殑闊抽涓庤棰戯級 -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹斺攢鈹� PublishManager.java锛堢鐞咰hannel鍜孲ubscriber锛� -鈹偮犅� 鈹偮犅� 鈹溾攢鈹� server锛堣礋璐e畬鎴�1078 RTP娑堟伅鍖呯殑鎺ユ敹鍜岃В鐮侊級 -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� Jtt1078Decoder.java -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� Jtt1078Handler.java -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� Jtt1078MessageDecoder.java -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹斺攢鈹� Session.java -鈹偮犅� 鈹偮犅� 鈹溾攢鈹� subscriber -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� RTMPPublisher.java锛堥�氳繃ffmpeg瀛愯繘绋嬪皢http-flv鍙﹀浼犺緭涓�浠藉埌RTMP鏈嶅姟鍣ㄧ殑瀹炵幇锛� -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� Subscriber.java锛堣闃呰�呮娊璞$被瀹氫箟锛� -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹斺攢鈹� VideoSubscriber.java锛堣棰戣闃呰�咃級 -鈹偮犅� 鈹偮犅� 鈹溾攢鈹� test锛堟祴璇曚唬鐮侊級 -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� AudioTest.java -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� ChannelTest.java -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� FuckTest.java -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� G711ATest.java -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� MP3Test.java -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� RTPGenerate.java锛堥�氳繃璇诲彇鍘熷娑堟伅鏁版嵁鏂囦欢锛屽垱寤篘涓慨鏀逛簡sim鍗″彿鐨勬柊鏁版嵁鏂囦欢锛屽彲鐢ㄤ簬鍘嬪姏娴嬭瘯锛� -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� VideoPushTest.java -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹溾攢鈹� VideoServer.java -鈹偮犅� 鈹偮犅� 鈹偮犅� 鈹斺攢鈹� WAVTest.java -鈹偮犅� 鈹偮犅� 鈹斺攢鈹� util -鈹偮犅� 鈹偮犅� 鈹溾攢鈹� ByteBufUtils.java -鈹偮犅� 鈹偮犅� 鈹溾攢鈹� ByteHolder.java -鈹偮犅� 鈹偮犅� 鈹溾攢鈹� ByteUtils.java -鈹偮犅� 鈹偮犅� 鈹溾攢鈹� Configs.java -鈹偮犅� 鈹偮犅� 鈹溾攢鈹� FileUtils.java -鈹偮犅� 鈹偮犅� 鈹溾攢鈹� FLVUtils.java -鈹偮犅� 鈹偮犅� 鈹溾攢鈹� Packet.java -鈹偮犅� 鈹偮犅� 鈹斺攢鈹� WAVUtils.java -鈹偮犅� 鈹斺攢鈹� resources -鈹偮犅� 鈹溾攢鈹� app.properties锛堜富閰嶇疆鏂囦欢锛� -鈹偮犅� 鈹溾攢鈹� audio.html -鈹偮犅� 鈹溾攢鈹� g726 -鈹偮犅� 鈹偮犅� 鈹溾攢鈹� in_16.g726 -鈹偮犅� 鈹偮犅� 鈹溾攢鈹� in_24.g726 -鈹偮犅� 鈹偮犅� 鈹溾攢鈹� in_32.g726 -鈹偮犅� 鈹偮犅� 鈹斺攢鈹� in_40.g726 -鈹偮犅� 鈹溾攢鈹� log4j.properties -鈹偮犅� 鈹溾攢鈹� multimedia.html锛堟祴璇曠敤闊宠棰戞挱鏀鹃〉闈級 -鈹偮犅� 鈹溾攢鈹� tcpdump.bin锛堟祴璇曠敤鏁版嵁鏂囦欢锛岄煶棰慉DPCM鍚捣鎬濆ご锛岃棰慔264锛� -鈹偮犅� 鈹溾攢鈹� nginx_sample.conf锛圢GINX鍙嶅悜浠g悊鏍蜂緥锛岃В鍐�6璺苟鍙戦棶棰橈級 -鈹偮犅� 鈹溾攢鈹� test.html -鈹偮犅� 鈹斺攢鈹� video.html -``` - -### 椤圭洰鎵撳寘璇存槑 -閫氳繃**mvn package**鐩存帴鎵撳寘鎴恓ar鍖咃紝閫氳繃`java -jar jtt1078-video-server-1.0-SNAPSHOT.jar`鍗冲彲杩愯锛屾渶濂芥妸**app.properties**鍜�**multimedia.html**涓�骞舵斁鍦ㄥ悓涓�涓洰褰曚笅锛屽洜涓洪」鐩細浼樺厛璇诲彇鏂囦欢绯荤粺涓殑閰嶇疆鏂囦欢淇℃伅銆傝�屽鏋滄病鏈夋湰鍦版祴璇曠殑闇�姹傦紝**multimedia.html**鍙互涓嶈銆� - -### 娉ㄦ剰浜嬮」 -1. 鏈」鐩负JT 1078鍗忚鐨勬祦濯掍綋鏈嶅姟鍣ㄩ儴鍒嗙殑瀹炵幇锛屼笉鍖呮嫭1078鍗忚鐨勬帶鍒舵秷鎭氦浜掗儴鍒嗭紝灏辨槸鍦�0x9101鎸囦护涓嬪彂鍚庯紝缁堢杩炴帴鍒扮殑闊宠棰戞湇鍔″櫒鐨勫疄鐜般�� -2. 鍦ㄤ竴鑸殑娴忚鍣ㄩ噷锛屾瘮濡侰hrome涓嬶紝娴忚鍣ㄩ檺鍒朵簡瀵逛簬鍚屼竴涓煙鍚嶇殑杩炴帴鏈�澶氬彧鑳藉鏈�6涓苟鍙戯紝鎵�浠ュ鏋滆鍚屾椂鎾斁澶氳矾瑙嗛锛岄渶瑕佸噯澶囧涓煙鍚嶆垨鏄鍙o紝閫氳繃杞惊鍒嗛厤鐨勬柟寮忥紝鎶婅棰戠殑浼犺緭杩炴帴锛屽垎閰嶅埌涓嶅悓鐨刄RL涓婂幓銆� - -### 鑷磋阿 -鏈」鐩竴寮�濮嬪彧鏄釜绠�鍗曠殑绀轰緥椤圭洰锛屽湪寮�婧愩�佸缓绔婹Q浜ゆ祦缇ゅ悗锛屽緱鍒颁簡澶ф壒鐨勫悓閬撲腑浜虹殑甯姪鍜屾敮鎸侊紝鍦ㄦ琛ㄧず璋㈡剰銆傛湰椤圭洰灏氭湭瀹屽叏瀹屽杽锛岄潪甯搁珮鍏磋兘澶熸湁鏇村鐨勬湅鍙嬩竴璧峰姞鍏ヨ繘鏉ワ紝涓�璧锋彁鍑烘洿鍔犻棯浜殑鎯虫硶锛屽缓璁炬洿鍔犲己澶х殑瑙嗛鐩戞帶骞冲彴锛� - -### 鑷磋阿鍚嶅崟 -闈炲父鎰熻阿浠ヤ笅缃戝弸鐨勫府鍔╁拰鏀寔锛屼互鍙婂叾浠栭粯榛樻敮鎸佺殑鏈嬪弸浠紒 -* 涓嶅哺涓嶅悕 -* 鏁呬簨~ -* 灏忛粍鐡滆鍚冮キ -* yedajiang44.com锛圼github.com/yedajiang44](https://github.com/yedajiang44)锛� -* 骞哥涓�瀹氬己 -* minigps-鍩虹珯瀹氫綅鏈嶅姟 -* 鎱㈡參 -* power LXC -* 濂庢潨 -* 瀛ゅ嘲璧忔湀/hx锛圼github/jelycom](https://github.com/jelycom)锛� -* 娲涘锛圼cuiyaonan](https://gitee.com/cuiyaonan2000)锛� -* tmyam - -### 鎺ㄨ崘缇ゅ弸椤圭洰 -|椤圭洰|URL|浣滆�厊璇存槑| -|---|---|---|---| -|JT1078|https://github.com/yedajiang44/JT1078|SmallChi/[yedajiang44](https://github.com/yedajiang44)|C#锛屾敮鎸侀煶瑙嗛锛岄�氳繃websocket浼犺緭flv鍒板墠绔瘄 - -### 浜ゆ祦璁ㄨ -QQ缇わ細808432702锛屽姞鍏ユ垜浠紝缇ら噷鏈夌儹蹇冪殑鍚岄亾涓汉銆佺浉鍏宠祫鏂欍�佹祴璇曟暟鎹�佷唬鐮佷互鍙婂悇绉嶆柟妗堢殑鍏堣鑰呯瓑鐫�浣犮�� - -### 鎹愬姪 -寮�婧愪笉鏄擄紝璇锋垜鎶芥敮鑺欒搲鐜嬪惂銆� - -<img src="./doc/donate.png" /> \ No newline at end of file diff --git a/doc/JTT 1078-2016.pdf b/doc/JTT 1078-2016.pdf new file mode 100644 index 0000000..6bd709b --- /dev/null +++ b/doc/JTT 1078-2016.pdf Binary files differ diff --git a/pom.xml b/pom.xml index b993b7c..acb4797 100644 --- a/pom.xml +++ b/pom.xml @@ -37,11 +37,11 @@ <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <version>${slf4j.version}</version> - </dependency> +<!-- <dependency>--> +<!-- <groupId>org.slf4j</groupId>--> +<!-- <artifactId>slf4j-log4j12</artifactId>--> +<!-- <version>${slf4j.version}</version>--> +<!-- </dependency>--> <dependency> <groupId>net.sf.json-lib</groupId> @@ -54,6 +54,35 @@ <groupId>de.sciss</groupId> <artifactId>jump3r</artifactId> <version>1.0.5</version> + </dependency> + <dependency> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> + <version>1.18.34</version> + <scope>provided</scope> + </dependency> + <!-- hutools宸ュ叿浠ユ潵 --> + <dependency> + <groupId>cn.hutool</groupId> + <artifactId>hutool-all</artifactId> + <version>5.2.0</version> + </dependency> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-web</artifactId> + <version>2.3.12.RELEASE</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + <!--websocket渚濊禆--> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-websocket</artifactId> + <version>2.3.12.RELEASE</version> </dependency> </dependencies> @@ -70,7 +99,7 @@ <archive> <manifest> <!-- 姝ゅ鎸囧畾main鏂规硶鍏ュ彛鐨刢lass --> - <mainClass>cn.org.hentai.jtt1078.app.VideoServerApp</mainClass> + <mainClass>cn.org.hentai.jtt1078.app.ServerApp</mainClass> </manifest> </archive> </configuration> @@ -79,7 +108,7 @@ <id>make-assembly</id> <phase>package</phase> <goals> - <goal>single</goal> + <goal>assembly</goal> </goals> </execution> </executions> diff --git a/src/main/java/cn/org/hentai/jtt1078/Jtt1078Application.java b/src/main/java/cn/org/hentai/jtt1078/Jtt1078Application.java new file mode 100644 index 0000000..cbf0dc6 --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/Jtt1078Application.java @@ -0,0 +1,12 @@ +package cn.org.hentai.jtt1078; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class Jtt1078Application { + + public static void main(String[] args) throws Exception { + SpringApplication.run(Jtt1078Application.class, args); + } +} diff --git a/src/main/java/cn/org/hentai/jtt1078/app/HttpServer.java b/src/main/java/cn/org/hentai/jtt1078/app/HttpServer.java new file mode 100644 index 0000000..40ab365 --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/app/HttpServer.java @@ -0,0 +1,65 @@ +package cn.org.hentai.jtt1078.app; + +import cn.org.hentai.jtt1078.http.GeneralResponseWriter; +import cn.org.hentai.jtt1078.http.NettyHttpServerHandler; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.*; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.net.InetAddress; + +@Slf4j +@Component +public class HttpServer { + private ServerBootstrap serverBootstrap; + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; + + @Value("${jt1078.http.port}") + private int port; + + public void start() throws Exception { + bossGroup = new NioEventLoopGroup(1); // acceptor thread + workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors()); + + serverBootstrap = new ServerBootstrap(); + serverBootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer<SocketChannel>() { + @Override + public void initChannel(SocketChannel ch) { + ch.pipeline().addLast( + new GeneralResponseWriter(), + new HttpResponseEncoder(), + new HttpRequestDecoder(), + new HttpObjectAggregator(1024 * 64), + new NettyHttpServerHandler() + ); + } + }) + .option(ChannelOption.SO_BACKLOG, 1024) + .childOption(ChannelOption.SO_KEEPALIVE, true); + + ChannelFuture f = serverBootstrap.bind(InetAddress.getByName("0.0.0.0"), port).sync(); + log.info("馃寪 Web Server started on port {}", port); + } + + @PreDestroy + public void shutdown() { + try { + if (workerGroup != null) workerGroup.shutdownGracefully(); + if (bossGroup != null) bossGroup.shutdownGracefully(); + log.info("HTTP Server shutdown gracefully"); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/cn/org/hentai/jtt1078/app/ServerApp.java b/src/main/java/cn/org/hentai/jtt1078/app/ServerApp.java new file mode 100644 index 0000000..582b6ae --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/app/ServerApp.java @@ -0,0 +1,34 @@ +package cn.org.hentai.jtt1078.app; + +import cn.org.hentai.jtt1078.publisher.PublishManager; +import cn.org.hentai.jtt1078.server.SessionManager; +import cn.org.hentai.jtt1078.util.Configs; +import org.springframework.stereotype.Component; +import javax.annotation.PostConstruct; +import javax.annotation.Resource; + +@Component +public class ServerApp { + + @Resource + private VideoServer videoServer; + @Resource + private HttpServer httpServer; + @Resource + private TalkServer talkServer; + @PostConstruct + public void start() throws Exception { + // 鍒濆鍖栭厤缃�佺鐞嗗櫒锛堝彲鎻愬彇鎴� ConfigService 鐢� Spring 绠$悊锛� + Configs.init("/app.properties"); + PublishManager.init(); + SessionManager.init(); + videoServer.start(); + httpServer.start(); + talkServer.start(); + // 娉ㄥ唽浼橀泤鍏抽棴锛堜篃鍙敤 Spring 鐨� DisposableBean銆丂PreDestroy锛� + sun.misc.Signal.handle(new sun.misc.Signal("TERM"), signal -> { + videoServer.shutdown(); + httpServer.shutdown(); + }); + } +} diff --git a/src/main/java/cn/org/hentai/jtt1078/app/TalkServer.java b/src/main/java/cn/org/hentai/jtt1078/app/TalkServer.java new file mode 100644 index 0000000..5f13809 --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/app/TalkServer.java @@ -0,0 +1,57 @@ +package cn.org.hentai.jtt1078.app; + +import cn.org.hentai.jtt1078.server.audio.Jt1078AudioEncoder; +import cn.org.hentai.jtt1078.server.talk.Jt1078TalkHandler; +import cn.org.hentai.jtt1078.server.talk.Jt1078TalkMessageDecoder; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import javax.annotation.PreDestroy; + +@Slf4j +@Component +public class TalkServer { + + + @Value("${jt1078.talk.port}") + private int port; + + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; + + + public void start() throws Exception { + bossGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(); + + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer<SocketChannel>() { + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline p = ch.pipeline(); + p.addLast(new Jt1078TalkMessageDecoder());//瑙g爜鍣� + p.addLast(new Jt1078AudioEncoder());//闊抽缂栫爜鍣� + p.addLast(new Jt1078TalkHandler());//璇煶瀵硅澶勭悊鍣� + } + }); + + Channel ch = bootstrap.bind(port).sync().channel(); + log.info("馃帳 Talk Server started on port {}", port); + } + + @PreDestroy + public void shutdown() { + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } +} diff --git a/src/main/java/cn/org/hentai/jtt1078/app/Test.java b/src/main/java/cn/org/hentai/jtt1078/app/Test.java deleted file mode 100644 index 430d33d..0000000 --- a/src/main/java/cn/org/hentai/jtt1078/app/Test.java +++ /dev/null @@ -1,19 +0,0 @@ -package cn.org.hentai.jtt1078.app; - -public class Test { - public static void main(String[] args) { - byte[] buffer = { - 48, 49, 99, 100, -127, 98, 0, 0, 0, 0, 0, 0, 1, 51, 7, 68, 67, -128, 1, 1, 0, 0, 1, -104, 53, -33, -2, 57, 0, 0, 0, 100, 3, -74, 0, 0, 0, 1, 103, 77, 0, 10, -106, 84, 2, -128, 45, -120, 0, 0, 0, 1, 104, -18, 60, -128, 0, 0, 0, 1, 101, -120, -128, 64, 5, -1, -96, -32, 64, 111, 81}; - - /* byte[] buffer = { - 48, 49, 99, 100, -127, - 98, 0, 2, 2, 2, - 50,80, 2, 32, 1, - 3,0, 0, 1, -104, 55, -112, 118, 111, 0, 0, 0, 0, 3, -74, 114, 66, 70, -114, -16, -109, 45, 86, -20,};*/ - int position = 28; - int h = buffer[position] & 0xff; - int l = buffer[position + 1] & 0xff; - int result = ((h << 8) | l) & 0xffff; - System.out.println("result = " + result); - } -} diff --git a/src/main/java/cn/org/hentai/jtt1078/app/VideoServer.java b/src/main/java/cn/org/hentai/jtt1078/app/VideoServer.java new file mode 100644 index 0000000..6c3df47 --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/app/VideoServer.java @@ -0,0 +1,61 @@ +package cn.org.hentai.jtt1078.app; + +import cn.org.hentai.jtt1078.server.video.Jtt1078Handler; +import cn.org.hentai.jtt1078.server.video.Jtt1078MessageDecoder; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; + +@Slf4j +@Component +public class VideoServer { + + private ServerBootstrap serverBootstrap; + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; + + @Value("${jt1078.video.port}") + private int port; + + @PreDestroy + public void destroy() { + shutdown(); + } + + public void start() throws Exception { + serverBootstrap = new ServerBootstrap(); + serverBootstrap.option(ChannelOption.SO_BACKLOG, 102400); + bossGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors()); + workerGroup = new NioEventLoopGroup(); + serverBootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer<SocketChannel>() { + @Override + protected void initChannel(final SocketChannel channel) { + ChannelPipeline p = channel.pipeline(); + p.addLast(new Jtt1078MessageDecoder()); + p.addLast(new Jtt1078Handler()); + } + }); + + Channel ch = serverBootstrap.bind("0.0.0.0", port).sync().channel(); + log.info("馃帴 Video Server started on port {}", port); + // 涓嶈 ch.closeFuture().sync()锛屼細闃诲 + } + + public void shutdown() { + try { + if (bossGroup != null) bossGroup.shutdownGracefully(); + if (workerGroup != null) workerGroup.shutdownGracefully(); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/cn/org/hentai/jtt1078/codec/ADPCMCodec.java b/src/main/java/cn/org/hentai/jtt1078/codec/ADPCMCodec.java index 1d2e32a..5c0b99b 100644 --- a/src/main/java/cn/org/hentai/jtt1078/codec/ADPCMCodec.java +++ b/src/main/java/cn/org/hentai/jtt1078/codec/ADPCMCodec.java @@ -1,14 +1,12 @@ package cn.org.hentai.jtt1078.codec; -import cn.org.hentai.jtt1078.server.Jtt1078Decoder; -import cn.org.hentai.jtt1078.util.ByteHolder; +import cn.org.hentai.jtt1078.server.video.Jtt1078Decoder; import cn.org.hentai.jtt1078.util.Packet; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.FileInputStream; import java.io.FileOutputStream; -import java.util.Arrays; /** * Created by houcheng on 2019-12-05. diff --git a/src/main/java/cn/org/hentai/jtt1078/codec/G711UCodec.java b/src/main/java/cn/org/hentai/jtt1078/codec/G711UCodec.java index cafd151..f7d5c2b 100644 --- a/src/main/java/cn/org/hentai/jtt1078/codec/G711UCodec.java +++ b/src/main/java/cn/org/hentai/jtt1078/codec/G711UCodec.java @@ -209,8 +209,6 @@ int len = -1; byte[] buff = new byte[320]; AudioCodec codec = new G711UCodec(); - - //ByteArrayOutputStream baos = new ByteArrayOutputStream(4096 * 1024); ByteArrayOutputStream baos = new ByteArrayOutputStream(4096 * 1024); while ((len = fis.read(buff)) > -1) { diff --git a/src/main/java/cn/org/hentai/jtt1078/entity/Analyze.java b/src/main/java/cn/org/hentai/jtt1078/entity/Analyze.java new file mode 100644 index 0000000..cd7a764 --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/entity/Analyze.java @@ -0,0 +1,18 @@ +package cn.org.hentai.jtt1078.entity; + +import lombok.Data; + +@Data +public class Analyze { + private byte pt; + private short sn; + private String sim; + private byte ch; + private byte dt; + private byte fi; + private long timestamp; + private short lastIInterval; + private short lastInterval; + private short dataLen; + private byte[] data; +} \ No newline at end of file diff --git a/src/main/java/cn/org/hentai/jtt1078/entity/AudioSendData.java b/src/main/java/cn/org/hentai/jtt1078/entity/AudioSendData.java new file mode 100644 index 0000000..7689148 --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/entity/AudioSendData.java @@ -0,0 +1,33 @@ +package cn.org.hentai.jtt1078.entity; + +import cn.org.hentai.jtt1078.entity.enums.ProtocolVersion; +import lombok.Data; + +@Data +public class AudioSendData { + // 璁惧SIM鍗″彿 + private String sim; + // 閫昏緫閫氶亾鍙� + private byte channel; + // 搴忓垪鍙� + private short sequenceNumber; + + // 鍘熷闊抽鏁版嵁 + private byte[] audioBytes; + + //鍗忚鐗堟湰 + private ProtocolVersion protocolVersion; // 2016 鎴� 2019 + + public AudioSendData(String sim, byte channel, byte[] audioBytes,ProtocolVersion protocolVersion) { + this.sim = sim; + this.channel = channel; + this.audioBytes = audioBytes; + this.sequenceNumber = generateSequence(); + this.protocolVersion = protocolVersion; + } + + private static short sequence = 0; + private static synchronized short generateSequence() { + return sequence++; + } +} \ No newline at end of file diff --git a/src/main/java/cn/org/hentai/jtt1078/entity/Jt1078Message.java b/src/main/java/cn/org/hentai/jtt1078/entity/Jt1078Message.java new file mode 100644 index 0000000..8f30011 --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/entity/Jt1078Message.java @@ -0,0 +1,64 @@ +package cn.org.hentai.jtt1078.entity; + +import cn.org.hentai.jtt1078.util.TimeUtils; +import lombok.*; + +/** + * Description: JTT1078鍗忚娑堟伅瀹炰綋绫� + * Author: lsy + * Date: 2025骞�07鏈�30鏃�16:26 + **/ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@ToString +public class Jt1078Message { + // 鍖呭ご鏍囧織 (0x30316364) + private int header; + // 鐗堟湰/濉厖/鎵╁睍/CSRC璁℃暟 (V/P/X/CC) + private byte vpxcc; + // 鏍囪浣嶅拰璐熻浇绫诲瀷 (M/PT) + private byte mpt; + // 鍖呭簭鍙� + private short sequence; + // SIM鍗″彿 (BCD缂栫爜) + private String sim; + // 閫昏緫閫氶亾鍙� + private byte channel; + // 鏁版嵁绫诲瀷 (4bit) + 鍒嗗寘澶勭悊鏍囪 (4bit) + private byte dataTypeAndSpm; + // 鏃堕棿鎴� (8瀛楄妭) + private byte[] timestamp; + // 涓庝笂涓�鍏抽敭甯х殑鐩稿鏃堕棿 + private short lastIframeInterval; + // 涓庝笂涓�甯х殑鐩稿鏃堕棿 + private short lastFrameInterval; + // 鏁版嵁浣撻暱搴� + private short length; + // 闊宠棰戞暟鎹� + private byte[] data; + + // 瑙f瀽鍚庣殑瀛楁锛堟柟渚夸娇鐢級 + private int dataType; // 鏁版嵁绫诲瀷 + private int spm; // 鍒嗗寘澶勭悊鏍囪 + private String timestampStr; // 鏍煎紡鍖栨椂闂村瓧绗︿覆 + private long timestampLong; // 鏁板�肩被鍨嬫椂闂存埑 + private int pt; // 璐熻浇绫诲瀷锛圡/PT瀛楁鐨勪綆7浣嶏級 + public void parseExtendedFields() { + // 瑙f瀽M/PT瀛楁 + int m = (this.mpt >> 7) & 0x01; + int pt = this.mpt & 0x7F; + + // 瑙f瀽鏁版嵁绫诲瀷鍜屽垎鍖呮爣璁� + this.dataType = (this.dataTypeAndSpm >> 4) & 0x0F; + this.spm = this.dataTypeAndSpm & 0x0F; + + // 瑙f瀽鏃堕棿鎴� + if (this.timestamp != null && this.timestamp.length == 8) { + this.timestampStr = TimeUtils.bytes8ToTimeString(this.timestamp); + this.timestampLong = TimeUtils.bytes8ToLong(this.timestamp); + } + } + +} \ No newline at end of file diff --git a/src/main/java/cn/org/hentai/jtt1078/entity/enums/ProtocolVersion.java b/src/main/java/cn/org/hentai/jtt1078/entity/enums/ProtocolVersion.java new file mode 100644 index 0000000..c2ec17f --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/entity/enums/ProtocolVersion.java @@ -0,0 +1,7 @@ +package cn.org.hentai.jtt1078.entity.enums; + +public enum ProtocolVersion { + UNKNOWN, + V2013, // 瀵瑰簲16 + V2019 // 瀵瑰簲19 +} \ No newline at end of file diff --git a/src/main/java/cn/org/hentai/jtt1078/publisher/Channel.java b/src/main/java/cn/org/hentai/jtt1078/publisher/Channel.java index 2cfd125..cd93a97 100644 --- a/src/main/java/cn/org/hentai/jtt1078/publisher/Channel.java +++ b/src/main/java/cn/org/hentai/jtt1078/publisher/Channel.java @@ -41,7 +41,7 @@ this.flvEncoder = new FlvEncoder(true, true); this.buffer = new ByteHolder(2048 * 100); - if (StringUtils.isEmpty(Configs.get("rtmp.url")) == false) + if (!StringUtils.isEmpty(Configs.get("rtmp.url"))) { rtmpPublisher = new RTMPPublisher(tag); rtmpPublisher.start(); diff --git a/src/main/java/cn/org/hentai/jtt1078/server/audio/Jt1078AudioEncoder.java b/src/main/java/cn/org/hentai/jtt1078/server/audio/Jt1078AudioEncoder.java new file mode 100644 index 0000000..5f5421a --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/server/audio/Jt1078AudioEncoder.java @@ -0,0 +1,121 @@ +package cn.org.hentai.jtt1078.server.audio; + +import cn.org.hentai.jtt1078.entity.AudioSendData; +import cn.org.hentai.jtt1078.entity.enums.ProtocolVersion; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import lombok.extern.slf4j.Slf4j; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +@Slf4j +public class Jt1078AudioEncoder extends MessageToByteEncoder<AudioSendData> { + + @Override + protected void encode(ChannelHandlerContext ctx, AudioSendData audioData, ByteBuf out) throws Exception { + try { + // 1. 杞崲闊抽鏍煎紡涓篜CM + byte[] pcmData = audioData.getAudioBytes(); + + // 2. 缂栫爜涓篏.711 A-law + byte[] g711Data = encodePcmToG711A(pcmData); + + // 3. 鏋勫缓JT1078鍗忚鍖� + buildJt1078Packet(out, audioData, g711Data); + + } catch (Exception e) { + log.error("闊抽缂栫爜澶辫触", e); + throw e; + } + } + + + private byte[] encodePcmToG711A(byte[] pcmData) { + byte[] alawData = new byte[pcmData.length / 2]; + ByteBuffer buffer = ByteBuffer.wrap(pcmData); + buffer.order(ByteOrder.LITTLE_ENDIAN); + + for (int i = 0; i < alawData.length; i++) { + short pcmSample = buffer.getShort(); + alawData[i] = linearToALaw(pcmSample); + } + return alawData; + } + + private void buildJt1078Packet(ByteBuf out, AudioSendData audioData, byte[] g711Data) { + // 1. JT1078 Header + out.writeBytes(new byte[]{0x30, 0x31, 0x63, 0x64}); + + // 2. Payload Type (06 = Audio Stream) + out.writeByte(0x81); + out.writeByte(0x86); + // 3. Sequence Number + out.writeShort(audioData.getSequenceNumber()); + + // 4. SIM (BCD, 10 bytes) + int simLength = (audioData.getProtocolVersion() == ProtocolVersion.V2013) ? 6 : 10; + byte[] simBcd = encodeSimToBcd(audioData.getSim(), simLength); + // byte[] simBcd = encodeSimToBcd(audioData.getSim(), 6); // 鍙兘闇�瑕佹洿闀� + out.writeBytes(simBcd); + + // 5. Channel + out.writeByte(audioData.getChannel()); + + // 6. Data Type (Audio) + Frame Info (Atomic) + out.writeByte(0x30); + + // 7. Timestamp (8 bytes) + out.writeLong(System.currentTimeMillis()); + + // 8. Data Length (2 bytes) + out.writeShort(g711Data.length); + + // 9. Audio Data (G.711 A-law) + out.writeBytes(g711Data); + + // 鎵撳嵃鍗佸叚杩涘埗瀛楃涓诧紙鎺ㄨ崘锛� + //log.info("鍙戦�佺殑 JT1078 鎶ユ枃锛圚ex锛�: {}", ByteBufUtil.hexDump(out)); + + } + + + private byte[] encodeSimToBcd(String sim, int length) { + // BCD缂栫爜姣忎釜瀛楄妭瀛�2浣嶆暟瀛楋紝鍥犳闇�瑕�2鍊嶉暱搴� + int digitCount = length * 2; + + // 琛ュ叏鎴栨埅鏂璖IM鍗″彿 + String normalizedSim = sim.length() > digitCount + ? sim.substring(0, digitCount) + : String.format("%-" + digitCount + "s", sim).replace(' ', '0'); + + byte[] bcd = new byte[length]; + for (int i = 0; i < digitCount; i += 2) { + int high = Character.digit(normalizedSim.charAt(i), 10); + int low = Character.digit(normalizedSim.charAt(i + 1), 10); + bcd[i / 2] = (byte) ((high << 4) | low); + } + return bcd; + } + + + private byte linearToALaw(short pcm) { + // G.711 A-law缂栫爜绠楁硶瀹炵幇 + int sign = (pcm & 0x8000) >> 8; + if (sign != 0) { + pcm = (short) -pcm; + } + if (pcm > 32767) pcm = 32767; + + int exponent = 7; + int mask = 0x4000; + while ((pcm & mask) == 0 && exponent > 0) { + exponent--; + mask >>= 1; + } + + int mantissa = (pcm >> (exponent == 0 ? 4 : exponent + 3)) & 0x0F; + byte alaw = (byte) (sign | (exponent << 4) | mantissa); + return (byte) (alaw ^ 0xD5); + } +} \ No newline at end of file diff --git a/src/main/java/cn/org/hentai/jtt1078/server/audio/Jt1078AudioSenderService.java b/src/main/java/cn/org/hentai/jtt1078/server/audio/Jt1078AudioSenderService.java new file mode 100644 index 0000000..dad08b9 --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/server/audio/Jt1078AudioSenderService.java @@ -0,0 +1,76 @@ +package cn.org.hentai.jtt1078.server.audio; + +import cn.org.hentai.jtt1078.entity.AudioSendData; +import cn.org.hentai.jtt1078.entity.enums.ProtocolVersion; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import lombok.extern.slf4j.Slf4j; + +import javax.annotation.PreDestroy; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class Jt1078AudioSenderService { + + // 璁惧閫氶亾缂撳瓨 (SIM -> Channel) + public static Map<String, Channel> deviceChannels = new ConcurrentHashMap<>(); + + // 闊抽鍙戦�佺嚎绋嬫睜 + private static final ExecutorService audioSendExecutor = Executors.newFixedThreadPool(4); + + /** + * 娉ㄥ唽璁惧閫氶亾 + */ + public static void registerChannel(String sim, Channel channel) { + if (!deviceChannels.containsKey(sim)) { + deviceChannels.put(sim, channel); + log.info("瀵硅閫氶亾娉ㄥ唽: SIM={}", sim); + } + } + + /** + * 绉婚櫎璁惧閫氶亾 + */ + public static void removeChannel(String sim) { + deviceChannels.remove(sim); + log.info("瀵硅閫氶亾绉婚櫎: SIM={}", sim); + } + + /** + * 鍙戦�侀煶棰戝埌璁惧 + */ + public static void sendAudio(String sim, int channel, byte[] audioData,ProtocolVersion protocolVersion) { + audioSendExecutor.execute(() -> { + Channel deviceChannel = deviceChannels.get(sim); + if (deviceChannel == null || !deviceChannel.isActive()) { + log.warn("瀵硅閫氶亾鏈敞鍐�: SIM={}", sim); + return; + } + AudioSendData sendData = new AudioSendData(sim, (byte)channel, audioData,protocolVersion); + ChannelFuture future = deviceChannel.writeAndFlush(sendData); + + future.addListener(f -> { + if (!f.isSuccess()) { + log.error("闊抽鍙戦�佸け璐�: SIM={}", sim, f.cause()); + } + }); + }); + } + + @PreDestroy + public void shutdown() { + audioSendExecutor.shutdown(); + try { + if (!audioSendExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + audioSendExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + audioSendExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } +} \ No newline at end of file diff --git a/src/main/java/cn/org/hentai/jtt1078/server/backup/Jt1078Handler.java b/src/main/java/cn/org/hentai/jtt1078/server/backup/Jt1078Handler.java new file mode 100644 index 0000000..5edf30f --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/server/backup/Jt1078Handler.java @@ -0,0 +1,76 @@ +package cn.org.hentai.jtt1078.server.backup; + +import cn.org.hentai.jtt1078.entity.Jt1078Message; +import cn.org.hentai.jtt1078.publisher.PublishManager; +import cn.org.hentai.jtt1078.server.SessionManager; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Created by matrixy on 2019/4/9. + */ +public class Jt1078Handler extends SimpleChannelInboundHandler<Jt1078Message> { + static Logger logger = LoggerFactory.getLogger(Jt1078Handler.class); + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Jt1078Message msg) { + String sim = msg.getSim(); + int channel = msg.getChannel(); + String tag = sim + "-" + channel; + int sequence = msg.getSequence(); + long timestamp = msg.getTimestampLong(); + int pt = msg.getPt(); + byte[] data = msg.getData(); + if(data==null) + { + return; + } + int dataType = msg.getDataType(); + int spm = msg.getSpm(); + + + if (dataType == 0x00 || dataType == 0x01 || dataType == 0x02) { + PublishManager.getInstance().publishVideo(tag, sequence, timestamp, pt, data); + } else if (dataType == 0x03) { + PublishManager.getInstance().publishAudio(tag, sequence, timestamp, pt, data); + } + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + release(ctx.channel()); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + // super.exceptionCaught(ctx, cause); + cause.printStackTrace(); + release(ctx.channel()); + ctx.close(); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) { + IdleStateEvent event = (IdleStateEvent) evt; + if (event.state() == IdleState.READER_IDLE) { + String tag = SessionManager.get(ctx.channel(), "tag"); + logger.info("read timeout: {}", tag); + release(ctx.channel()); + } + } + } + + private void release(io.netty.channel.Channel channel) { + String tag = SessionManager.get(channel, "tag"); + if (tag != null) { + logger.info("close netty channel: {}", tag); + PublishManager.getInstance().close(tag); + } + } +} diff --git a/src/main/java/cn/org/hentai/jtt1078/server/backup/Jt1078MessageDecoder.java b/src/main/java/cn/org/hentai/jtt1078/server/backup/Jt1078MessageDecoder.java new file mode 100644 index 0000000..93b7e1c --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/server/backup/Jt1078MessageDecoder.java @@ -0,0 +1,145 @@ +package cn.org.hentai.jtt1078.server.backup; + +import cn.org.hentai.jtt1078.entity.Jt1078Message; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +@Slf4j +public class Jt1078MessageDecoder extends ByteToMessageDecoder { + private static final int HEADER_MIN_LEN = 34; + private static final int MAX_SKIP_BYTES = 4096; // 闃查敊鏈哄埗 + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { + int skippedBytes = 0; + + while (true) { + if (in.readableBytes() < 4) return; + + in.markReaderIndex(); + int header = in.readInt(); + + if ((header & 0x7FFFFFFF) != 0x30316364) { + in.resetReaderIndex(); + in.readByte(); // 婊戝姩绐楀彛缁х画鏌ユ壘鍖呭ご + skippedBytes++; + if (skippedBytes >= MAX_SKIP_BYTES) { + log.error("Too many invalid bytes ({}), close connection", skippedBytes); + ctx.close(); + return; + } + continue; + } + + // mark 鍖呭ご瀵归綈浣嶇疆 + in.markReaderIndex(); + + // 淇濊瘉鑷冲皯璇诲畬鍩烘湰瀛楁 + if (in.readableBytes() < 1 + 1 + 2) { + in.resetReaderIndex(); + return; + } + + byte vpxcc = in.readByte(); + byte mpt = in.readByte(); + short seq = in.readShort(); + + // 鍏堝垽鏂崗璁増鏈� + boolean isV2019 = detectProtocolVersion(in); + int simLength = isV2019 ? 10 : 6; + + // 鍒ゆ柇鎬诲墿浣欐暟鎹槸鍚﹁冻澶燂紙SIM 鍗� + 閫氶亾 + 鏁版嵁绫诲瀷 + 鏃堕棿鎴� + iframe/frame interval + 闀垮害瀛楁锛� + if (in.readableBytes() < simLength + 1 + 1 + 8 + 2 + 2 + 2) { + in.resetReaderIndex(); + return; + } + + byte[] simBytes = new byte[simLength]; + in.readBytes(simBytes); + String sim = bcdToString(simBytes); + + byte channel = in.readByte(); + byte dataTypeByte = in.readByte(); + + byte[] timestampBytes = new byte[8]; + in.readBytes(timestampBytes); + short lastIframeInterval = in.readShort(); + short lastFrameInterval = in.readShort(); + + short dataLength = in.readShort(); + if (dataLength < 0) { + log.warn("Invalid dataLength: {}", dataLength); + in.resetReaderIndex(); + in.readByte(); // 璺�1瀛楄妭鍐嶉噸鏂版煡鎵惧寘澶� + skippedBytes++; + continue; + } + + if (in.readableBytes() < dataLength) { + in.resetReaderIndex(); + return; // 绛夊緟鏇村鏁版嵁 + } + + byte[] data = new byte[dataLength]; + in.readBytes(data); + + Jt1078Message message = new Jt1078Message(); + message.setHeader(header); + message.setVpxcc(vpxcc); + message.setMpt(mpt); + message.setSequence(seq); + message.setSim(sim); + message.setChannel(channel); + message.setDataTypeAndSpm(dataTypeByte); + message.setTimestamp(timestampBytes); + message.setLastIframeInterval(lastIframeInterval); + message.setLastFrameInterval(lastFrameInterval); + message.setLength(dataLength); + message.parseExtendedFields(); + + out.add(message); + log.warn("Parsed packet: {}", message.toString()); + } + } + + + private boolean detectProtocolVersion(ByteBuf in) { + // 淇濆瓨鍘熷璇诲彇浣嶇疆 + in.markReaderIndex(); + + try { + // 2016鍗忚SIM鍗″彿浣嶇疆锛氬寘澶�4 + VPXCC1 + MPT1 + SEQ2 = 8瀛楄妭 + // 鎵�浠IM鍗″彿鍦�8-13瀛楄妭锛�6瀛楄妭锛� + // 2019鍗忚SIM鍗″彿鍦�8-17瀛楄妭锛�10瀛楄妭锛� + + // 璺冲埌SIM鍗″彿璧峰浣嶇疆 + in.skipBytes(8); + + // 妫�鏌ョ14-17瀛楄妭锛�2016鍗忚杩欓儴鍒嗘槸閫氶亾鍙�/鏁版嵁绫诲瀷锛�2019鍗忚鏄疭IM鍗$户缁級 + byte[] probeBytes = new byte[4]; + in.readBytes(probeBytes); + + // BCD鐮佹湁鏁堟�ф鏌ワ細姣忓崐涓瓧鑺傚簲鍦�0-9涔嬮棿 + for (byte b : probeBytes) { + if (((b & 0xF0) >>> 4) > 9 || ((b & 0x0F) > 9)) { + return true; // 鍙戠幇闈濨CD鐮侊紝鍙兘鏄�2019鍗忚 + } + } + return false; + } finally { + // 鏃犺妫�娴嬫垚鍔熶笌鍚︼紝閮介噸缃鍙栦綅缃� + in.resetReaderIndex(); + } + } + private String bcdToString(byte[] bcd) { + StringBuilder sb = new StringBuilder(); + for (byte b : bcd) { + sb.append(String.format("%02x", b)); + } + return sb.toString(); + } +} diff --git a/src/main/java/cn/org/hentai/jtt1078/server/talk/Jt1078TalkDecoder.java b/src/main/java/cn/org/hentai/jtt1078/server/talk/Jt1078TalkDecoder.java new file mode 100644 index 0000000..2f30b33 --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/server/talk/Jt1078TalkDecoder.java @@ -0,0 +1,422 @@ +package cn.org.hentai.jtt1078.server.talk; + +import cn.org.hentai.jtt1078.entity.Analyze; +import cn.org.hentai.jtt1078.entity.enums.ProtocolVersion; +import cn.org.hentai.jtt1078.util.ByteHolder; +import cn.org.hentai.jtt1078.util.ByteUtils; +import cn.org.hentai.jtt1078.util.G711Util; +import cn.org.hentai.jtt1078.util.Packet; +import cn.org.hentai.jtt1078.websocket.MyWebSocketHandler; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import javax.sound.sampled.AudioFormat; +import javax.sound.sampled.SourceDataLine; +import java.io.IOException; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +/** + * JTT1078 Protocol Decoder + */ +@Slf4j +@Component +public class Jt1078TalkDecoder { + private final ByteHolder buffer = new ByteHolder(4096); + private static final byte[] HEAD1078 = {0x30, 0x31, 0x63, 0x64}; + private ProtocolVersion protocolVersion = ProtocolVersion.UNKNOWN; + + public void write(byte[] block) { + buffer.write(block); + } + + public void write(byte[] block, int startIndex, int length) { + byte[] buff = new byte[length]; + System.arraycopy(block, startIndex, buff, 0, length); + write(buff); + } + + public Packet decode() { + + if (buffer.size() < 4) return null; + + // Check header + if (!Arrays.equals(buffer.array(4), HEAD1078)) { +// log.warn("Invalid protocol header, expected:30316364, actual:{}", +// ByteUtils.toHexString(buffer.array(4))); + buffer.clear(); + return null; + } + + Packet packet; + // 绗竴娆¤嚜鍔ㄨ瘑鍒増鏈� + if (protocolVersion == ProtocolVersion.UNKNOWN) { + packet = autoDetectVersionAndParse(); + } else { + packet = decodePacket(protocolVersion); + + } + + if (packet != null) { + // 鉁� 缁熶竴澶勭悊瑙f瀽涓庨煶棰� + Analyze analyze = (protocolVersion == ProtocolVersion.V2013) + ? parse16(packet.getBytes()) + : parse19(packet.getBytes()); + processAudio(analyze); + + } + + return packet; + } + + // 鑷姩妫�娴嬬増鏈苟瑙f瀽 + private Packet autoDetectVersionAndParse() { + // 灏濊瘯 2013 鐗堟湰 + Packet packet = decodePacket(ProtocolVersion.V2013); + if (packet != null) { + protocolVersion = ProtocolVersion.V2013; + Analyze analyze = parse16(packet.getBytes()); + System.out.printf("%s-%d device started output%n", analyze.getSim(), analyze.getCh()); + processAudio(analyze); + return packet; + } + + // 灏濊瘯 2019 鐗堟湰 + packet = decodePacket(ProtocolVersion.V2019); + if (packet != null) { + protocolVersion = ProtocolVersion.V2019; + Analyze analyze = parse19(packet.getBytes()); + System.out.printf("%s-%d device started output%n", analyze.getSim(), analyze.getCh()); + processAudio(analyze); + return packet; + } + return null; + } + + // 鏍规嵁鎸囧畾鐗堟湰瑙f瀽 + private Packet decodePacket(ProtocolVersion version) { + if (version == ProtocolVersion.UNKNOWN) { + return null; + } + + int dataTypeIndex = (version == ProtocolVersion.V2013) ? 15 : 19; + if (buffer.size() < dataTypeIndex + 1) return null; + + byte dataType = (byte) ((buffer.get(dataTypeIndex) >> 4) & 0x0F); + int dataLenIndex; + + switch (dataType) { + case 3: + dataLenIndex = dataTypeIndex + 9; + break; + case 4: + dataLenIndex = dataTypeIndex + 1; + break; + default: + dataLenIndex = dataTypeIndex + 13; + } + //log.info("dataType:"+dataType); + if (buffer.size() < dataLenIndex + 2) return null; + int dataLen = buffer.getShort(dataLenIndex) & 0xFFFF; + int packetLength = dataLenIndex + 2 + dataLen; + + if (buffer.size() < packetLength) return null; + + // Check next packet header + if (buffer.size() >= packetLength + 4) { + byte[] nextHeader = new byte[4]; + for (int i = 0; i < 4; i++) { + nextHeader[i] = buffer.get(packetLength + i); + } + if (!Arrays.equals(nextHeader, HEAD1078)) { + //log.warn("Invalid data type: {}", dataType); + buffer.clear(); + return null; + } + } + + byte[] packetData = new byte[packetLength]; + buffer.sliceInto(packetData, packetLength); + + //printPacketInfo(packetData, version); + + return Packet.create(packetData); + } + + // 瑙f瀽16鐗堟湰鏁版嵁 + private Analyze parse16(byte[] data) { + Analyze analyze = new Analyze(); + analyze.setPt((byte) (data[5] & 0x7F)); + analyze.setSn((short) (((data[6] & 0xFF) << 8) | (data[7] & 0xFF))); + + StringBuilder simBuilder = new StringBuilder(); + for (int i = 0; i < 6; i++) { + simBuilder.append(nextBcd(data, 8 + i)); + } + analyze.setSim(simBuilder.toString()); + + analyze.setCh(data[14]); + analyze.setDt((byte) ((data[15] >> 4) & 0x0F)); + analyze.setFi((byte) (data[15] & 0x0F)); + + int dataLenIndex; + switch (analyze.getDt()) { + case 3: + analyze.setTimestamp(setTimestamp(data, 16)); + dataLenIndex = 24; + break; + case 4: + dataLenIndex = 16; + break; + default: + analyze.setTimestamp(setTimestamp(data, 16)); + analyze.setLastIInterval((short) (((data[24] & 0xFF) << 8) | (data[25] & 0xFF))); + analyze.setLastInterval((short) (((data[26] & 0xFF) << 8) | (data[27] & 0xFF))); + dataLenIndex = 28; + } + + analyze.setDataLen((short) (((data[dataLenIndex] & 0xFF) << 8) | (data[dataLenIndex + 1] & 0xFF))); + byte[] payload = new byte[analyze.getDataLen()]; + System.arraycopy(data, dataLenIndex + 2, payload, 0, payload.length); + analyze.setData(payload); + + return analyze; + } + + // 瑙f瀽19鐗堟湰鏁版嵁 + private Analyze parse19(byte[] data) { + Analyze analyze = new Analyze(); + analyze.setPt((byte) (data[5] & 0x7F)); + analyze.setSn((short) (((data[6] & 0xFF) << 8) | (data[7] & 0xFF))); + + StringBuilder simBuilder = new StringBuilder(); + for (int i = 0; i < 10; i++) { + simBuilder.append(nextBcd(data, 8 + i)); + } + analyze.setSim(simBuilder.toString()); + + analyze.setCh(data[18]); + analyze.setDt((byte) ((data[19] >> 4) & 0x0F)); + analyze.setFi((byte) (data[19] & 0x0F)); + + int dataLenIndex; + switch (analyze.getDt()) { + case 3: + analyze.setTimestamp(setTimestamp(data, 20)); + dataLenIndex = 28; + break; + case 4: + dataLenIndex = 20; + break; + default: + analyze.setTimestamp(setTimestamp(data, 20)); + analyze.setLastIInterval((short) (((data[28] & 0xFF) << 8) | (data[29] & 0xFF))); + analyze.setLastInterval((short) (((data[30] & 0xFF) << 8) | (data[31] & 0xFF))); + dataLenIndex = 32; + } + + analyze.setDataLen((short) (((data[dataLenIndex] & 0xFF) << 8) | (data[dataLenIndex + 1] & 0xFF))); + byte[] payload = new byte[analyze.getDataLen()]; + System.arraycopy(data, dataLenIndex + 2, payload, 0, payload.length); + analyze.setData(payload); + + return analyze; + } + + private long setTimestamp(byte[] data, int offset) { + long timestamp = 0; + for (int i = 0; i < 8; i++) { + timestamp = (timestamp << 8) | (data[offset + i] & 0xFF); + } + return timestamp; + } + + private String nextBcd(byte[] data, int offset) { + byte val = data[offset]; + int ch1 = (val >> 4) & 0x0F; + int ch2 = val & 0x0F; + return String.format("%d%d", ch1, ch2); + } + + + private void processAudio(Analyze analyze) { + if (analyze.getDt() == 3) { + try { + processAudioFrame(analyze); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + // region 鏈湴鎾斁 + private final int AUDIO_BUFFER_SIZE = 1600; // 100ms鐨勭紦鍐�(8000Hz * 16bit * 1澹伴亾 * 0.1绉� /8) + private final AudioFormat PCM_FORMAT = new AudioFormat(8000.0f, 16, 1, true, false); + private SourceDataLine audioLine; + private final BlockingQueue<byte[]> audioQueue = new ArrayBlockingQueue<>(20); // 鎺у埗缂撳啿闃熷垪澶у皬 + private volatile boolean isPlaying = false; + private Thread playbackThread; + // public Jt1078Decoder() { + // 鍒濆鍖栭煶棰戣緭鍑� + //initAudioSystem(); + //} + // 鍒濆鍖栭煶棰戠郴缁燂紙鍦ㄦ瀯閫犲嚱鏁颁腑璋冪敤锛� +// private void initAudioSystem() { +// try { +// DataLine.Info info = new DataLine.Info(SourceDataLine.class, PCM_FORMAT); +// audioLine = (SourceDataLine) AudioSystem.getLine(info); +// +// // 浣跨敤鏇村皬鐨勭紦鍐插尯鍑忓皯寤惰繜 +// audioLine.open(PCM_FORMAT, AUDIO_BUFFER_SIZE); +// audioLine.start(); +// +// // 鍚姩鎾斁绾跨▼ +// isPlaying = true; +// playbackThread = new Thread(this::playbackLoop); +// playbackThread.setDaemon(true); +// playbackThread.start(); +// +// System.out.println("闊抽绯荤粺鍒濆鍖栧畬鎴愶紝缂撳啿鍖哄ぇ灏�: " + AUDIO_BUFFER_SIZE + " bytes"); +// } catch (LineUnavailableException e) { +// System.err.println("闊抽璁惧鍒濆鍖栧け璐�: " + e.getMessage()); +// } +// } + + // 鎾斁绾跨▼涓诲惊鐜� +// private void playbackLoop() { +// byte[] silentFrame = new byte[320]; // 20ms闈欓煶甯� +// Arrays.fill(silentFrame, (byte)0); +// +// while (isPlaying) { +// try { +// byte[] audioData = audioQueue.poll(50, TimeUnit.MILLISECONDS); +// +// if (audioData != null) { +// audioLine.write(audioData, 0, audioData.length); +// } else { +// Thread.sleep(5); +// } +// } catch (InterruptedException e) { +// Thread.currentThread().interrupt(); +// } +// } +// } +// endregion + // 鏀硅繘鐨勯煶棰戞暟鎹鐞嗘柟娉� + private void processAudioFrame(Analyze analyze) throws IOException { + if (analyze.getDt() != 3) return; // 闈為煶棰戝抚 + + byte[] rawData = analyze.getData(); + if (rawData == null || rawData.length <= 32) return; // 璺宠繃鏃犳晥甯� + + // 1. 鎻愬彇鏈夋晥璐熻浇锛堣烦杩嘕T1078澶撮儴锛� + int payloadOffset = 32; // 鏍规嵁瀹為檯鍗忚璋冩暣 + byte[] g711aPayload = Arrays.copyOfRange(rawData, payloadOffset, rawData.length); + + // 鍙戦�佺粰瀵瑰簲 sim 鐨勫墠绔� + String sim = analyze.getSim(); // 鍋囪 Analyze 瀵硅薄鏈� sim 瀛楁 + + // 2. 蹇�熻В鐮� + byte[] pcmData = G711Util.decodeG711AFast(g711aPayload); + + // 4. 鍙戦�佺粰鍓嶇 + MyWebSocketHandler.sendAudioToSim(sim, pcmData); + +// // 3. 闈為樆濉炴彁浜ゅ埌鎾斁闃熷垪 +// if (!audioQueue.offer(pcmData)) { +// audioQueue.poll(); // 涓㈠純鏈�鏃у抚锛堣�屼笉鏄� flush 杈撳嚭娴侊級 +// audioQueue.offer(pcmData); // 鍐嶆灏濊瘯鍔犲叆鏂板抚 +// } + } + + // 鍏抽棴鏃堕噴鏀捐祫婧� + public void close() { + isPlaying = false; + if (playbackThread != null) { + playbackThread.interrupt(); + } + if (audioLine != null) { + audioLine.drain(); + audioLine.close(); + } + } + + private void printPacketInfo(byte[] packet, ProtocolVersion version) { + System.out.println("Raw Packet (Hex):\n" + ByteUtils.toHexString(packet)); + + System.out.println("\n=== JTT1078 " + ((version == ProtocolVersion.V2013) ? "2013" : "2019") + " Protocol " + + "Packet ==="); + System.out.println("Header: " + ByteUtils.toHexString(packet, 0, 4)); + + Analyze analyze = (version == ProtocolVersion.V2013) ? parse16(packet) : parse19(packet); + System.out.println("Payload Type: " + analyze.getPt()); + System.out.println("Sequence Number: " + analyze.getSn()); + System.out.println("SIM: " + analyze.getSim()); + System.out.println("Channel: " + analyze.getCh()); + String dataType; + switch (analyze.getDt()) { + case 0: + dataType = "Video I Frame"; + break; + case 1: + dataType = "Video P Frame"; + break; + case 2: + dataType = "Video B Frame"; + break; + case 3: + dataType = "Audio Frame"; + break; + case 4: + dataType = "Transparent Data"; + break; + default: + dataType = "Unknown"; + break; + } + System.out.println("Data Type: " + dataType); + String FrameInfo; + switch (analyze.getFi()) { + case 0: + FrameInfo = "Atomic Packet"; + break; + case 1: + FrameInfo = "First Packet"; + break; + case 2: + FrameInfo = "Last Packet"; + break; + case 3: + FrameInfo = "Middle Packet"; + break; + default: + FrameInfo = "Unknown (" + analyze.getFi() + ")"; + break; + } + System.out.println("Frame Info: " + FrameInfo); + + if (analyze.getTimestamp() != 0) { + Instant instant = Instant.ofEpochMilli(analyze.getTimestamp()); + ZonedDateTime beijingTime = instant.atZone(ZoneOffset.ofHours(8)); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + System.out.println("Timestamp: " + formatter.format(beijingTime)); + } + + if (analyze.getLastIInterval() != 0) { + System.out.println("Last I Frame Interval: " + analyze.getLastIInterval()); + } + if (analyze.getLastInterval() != 0) { + System.out.println("Last Frame Interval: " + analyze.getLastInterval()); + } + + System.out.println("Data Length: " + analyze.getDataLen()); + System.out.println("=================================\n"); + + } +} + diff --git a/src/main/java/cn/org/hentai/jtt1078/server/talk/Jt1078TalkHandler.java b/src/main/java/cn/org/hentai/jtt1078/server/talk/Jt1078TalkHandler.java new file mode 100644 index 0000000..2837a88 --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/server/talk/Jt1078TalkHandler.java @@ -0,0 +1,154 @@ +package cn.org.hentai.jtt1078.server.talk; + +import cn.org.hentai.jtt1078.publisher.Channel; +import cn.org.hentai.jtt1078.publisher.PublishManager; +import cn.org.hentai.jtt1078.server.Session; +import cn.org.hentai.jtt1078.server.SessionManager; +import cn.org.hentai.jtt1078.server.audio.Jt1078AudioSenderService; +import cn.org.hentai.jtt1078.util.Packet; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.AttributeKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Created by matrixy on 2019/4/9. + */ +public class Jt1078TalkHandler extends SimpleChannelInboundHandler<Packet> +{ + static Logger logger = LoggerFactory.getLogger(Jt1078TalkHandler.class); + private static final AttributeKey<Session> SESSION_KEY = AttributeKey.valueOf("session-key"); + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Packet packet) { + io.netty.channel.Channel nettyChannel = ctx.channel(); + + // 1. 鍗忚鐗堟湰妫�娴� + boolean is2019Protocol = detectProtocolVersion(packet); + int simLength = is2019Protocol ? 10 : 6; + + // 2. 璇诲彇SIM鍗″彿鍜岄�氶亾鍙� + packet.seek(8); + StringBuilder simBuilder = new StringBuilder(); + for (int i = 0; i < simLength; i++) { + simBuilder.append(packet.nextBCD()); + } + String sim = simBuilder.toString(); + + int channelPos = is2019Protocol ? 18 : 14; + packet.seek(channelPos); + int channel = packet.nextByte() & 0xff; + String tag = sim + "-" + channel; + Jt1078AudioSenderService.registerChannel(sim, nettyChannel); + // 3. 浼氳瘽绠$悊 + if (!SessionManager.contains(nettyChannel, "tag")) { + String publishKey = tag + ":talk"; // 瀵硅绔� + Channel chl = PublishManager.getInstance().open(publishKey); + SessionManager.set(nettyChannel, "tag", publishKey); + logger.info("start publishing: {} -> {}-{} (Protocol: {})", + Long.toHexString(chl.hashCode() & 0xffffffffL), + sim, channel, + is2019Protocol ? "2019" : "2016"); + } + + // 4. 鏁版嵁澶勭悊 + Integer sequence = SessionManager.get(nettyChannel, "talk:video-sequence"); + if (sequence == null) sequence = 0; + + int dataTypePos = is2019Protocol ? 19 : 15; + packet.seek(dataTypePos); + int dataType = (packet.nextByte() >> 4) & 0x0f; + int pkType = packet.nextByte() & 0x0f; + + int baseOffset = is2019Protocol ? 32 : 28; + int lengthOffset = baseOffset; + if (dataType == 0x04) lengthOffset = baseOffset - 8 - 2 - 2; + else if (dataType == 0x03) lengthOffset = baseOffset - 4; + + int pt = packet.seek(5).nextByte() & 0x7f; + int timestampOffset = is2019Protocol ? 20 : 16; + + if (dataType == 0x00 || dataType == 0x01 || dataType == 0x02) { + if (pkType == 0 || pkType == 2) { + sequence += 1; + SessionManager.set(nettyChannel, "talk-sequence", sequence); + } + long timestamp = packet.seek(timestampOffset).nextLong(); + byte[] videoData = packet.seek(lengthOffset + 2).nextBytes(); + //PublishManager.getInstance().publishVideo(tag, sequence, timestamp, pt, videoData); + } + else if (dataType == 0x03) { + long timestamp = packet.seek(timestampOffset).nextLong(); + byte[] audioData = packet.seek(lengthOffset + 2).nextBytes(); + //PublishManager.getInstance().publishAudio(tag, sequence, timestamp, pt, audioData); + } + } + + private boolean detectProtocolVersion(Packet packet) { + // 鏂规硶1锛氭鏌�6瀛楄妭SIM鍗″彿鍚庣殑濉厖 + packet.seek(8 + 6); // 2016鍗忚SIM鍗$粨鏉熶綅缃� + byte b14 = packet.nextByte(); + byte b15 = packet.nextByte(); + + // 濡傛灉鏄�2019鍗忚锛岃繖閲屽簲璇ユ槸0x00濉厖 + if ((b14 & 0xFF) == 0x00 && (b15 & 0xFF) == 0x00) { + return true; + } + + // 鏂规硶2锛氭鏌ユ暟鎹被鍨嬩綅缃殑鏈夋晥鎬� + packet.seek(15); + byte dtByte = packet.nextByte(); + int dataType = (dtByte >> 4) & 0x0F; + int frameType = dtByte & 0x0F; + + // 濡傛灉鏁版嵁绫诲瀷鎴栧抚绫诲瀷鏃犳晥锛屽彲鑳芥槸2019鍗忚 + if (dataType > 4 || frameType > 3) { + return true; + } + + // 榛樿杩斿洖2016鍗忚 + return false; + } + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception + { + super.channelInactive(ctx); + release(ctx.channel()); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception + { + // super.exceptionCaught(ctx, cause); + cause.printStackTrace(); + release(ctx.channel()); + ctx.close(); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) { + IdleStateEvent event = (IdleStateEvent) evt; + if (event.state() == IdleState.READER_IDLE) { + String tag = SessionManager.get(ctx.channel(), "tag"); + logger.info("read timeout: {}",tag); + release(ctx.channel()); + } + } + } + + private void release(io.netty.channel.Channel channel) + { + String tag = SessionManager.get(channel, "tag"); + if (tag != null) + { + logger.info("close netty channel: {}", tag); + // PublishManager.getInstance().close(tag); + String sim = tag.split("-")[0]; // 鍙栧嚭SIM鍗″彿閮ㄥ垎 + Jt1078AudioSenderService.removeChannel(sim); + } + } +} diff --git a/src/main/java/cn/org/hentai/jtt1078/server/talk/Jt1078TalkMessageDecoder.java b/src/main/java/cn/org/hentai/jtt1078/server/talk/Jt1078TalkMessageDecoder.java new file mode 100644 index 0000000..5e48d81 --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/server/talk/Jt1078TalkMessageDecoder.java @@ -0,0 +1,35 @@ +package cn.org.hentai.jtt1078.server.talk; + +import cn.org.hentai.jtt1078.util.Packet; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.List; + +/** + * Created by matrixy on 2019/4/9. + */ +public class Jt1078TalkMessageDecoder extends ByteToMessageDecoder { + byte[] block = new byte[4096]; + + private final Jt1078TalkDecoder decoder = new Jt1078TalkDecoder(); + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { + int length = in.readableBytes(); + for (int i = 0, k = (int) Math.ceil(length / 512f); i < k; i++) { + int l = i < k - 1 ? 512 : length - (i * 512); + in.readBytes(block, 0, l); + + decoder.write(block, 0, l); + + while (true) { + Packet p = decoder.decode(); + if (p == null) break; + + out.add(p); + } + } + } +} diff --git a/src/main/java/cn/org/hentai/jtt1078/server/video/Jtt1078Decoder.java b/src/main/java/cn/org/hentai/jtt1078/server/video/Jtt1078Decoder.java new file mode 100644 index 0000000..9602f03 --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/server/video/Jtt1078Decoder.java @@ -0,0 +1,299 @@ +package cn.org.hentai.jtt1078.server.video; + +import cn.org.hentai.jtt1078.entity.Analyze; +import cn.org.hentai.jtt1078.entity.enums.ProtocolVersion; +import cn.org.hentai.jtt1078.util.ByteHolder; +import cn.org.hentai.jtt1078.util.ByteUtils; +import cn.org.hentai.jtt1078.util.Packet; +import lombok.extern.slf4j.Slf4j; + +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Arrays; + +/** + * JTT1078 Protocol Decoder + */ +@Slf4j +public class Jtt1078Decoder { + private final ByteHolder buffer = new ByteHolder(4096); + private static final byte[] HEAD1078 = {0x30, 0x31, 0x63, 0x64}; + private ProtocolVersion protocolVersion = ProtocolVersion.UNKNOWN; + + public void write(byte[] block) { + buffer.write(block); + } + + public void write(byte[] block, int startIndex, int length) { + byte[] buff = new byte[length]; + System.arraycopy(block, startIndex, buff, 0, length); + write(buff); + } + + public Packet decode() { + + if (buffer.size() < 4) return null; + + // Check header + if (!Arrays.equals(buffer.array(4), HEAD1078)) { +// log.warn("Invalid protocol header, expected:30316364, actual:{}", +// ByteUtils.toHexString(buffer.array(4))); + buffer.clear(); + return null; + } + + Packet packet; + // 绗竴娆¤嚜鍔ㄨ瘑鍒増鏈� + if (protocolVersion == ProtocolVersion.UNKNOWN) { + packet = autoDetectVersionAndParse(); + } else { + packet = decodePacket(protocolVersion); + + } + return packet; + } + + // 鑷姩妫�娴嬬増鏈苟瑙f瀽 + private Packet autoDetectVersionAndParse() { + // 灏濊瘯 2013 鐗堟湰 + Packet packet = decodePacket(ProtocolVersion.V2013); + if (packet != null) { + protocolVersion = ProtocolVersion.V2013; + Analyze analyze = parse16(packet.getBytes()); + System.out.printf("%s-%d device started output%n", analyze.getSim(), analyze.getCh()); + return packet; + } + + // 灏濊瘯 2019 鐗堟湰 + packet = decodePacket(ProtocolVersion.V2019); + if (packet != null) { + protocolVersion = ProtocolVersion.V2019; + Analyze analyze = parse19(packet.getBytes()); + System.out.printf("%s-%d device started output%n", analyze.getSim(), analyze.getCh()); + return packet; + } + return null; + } + + // 鏍规嵁鎸囧畾鐗堟湰瑙f瀽 + private Packet decodePacket(ProtocolVersion version) { + if (version == ProtocolVersion.UNKNOWN) { + return null; + } + + int dataTypeIndex = (version == ProtocolVersion.V2013) ? 15 : 19; + if (buffer.size() < dataTypeIndex + 1) return null; + + byte dataType = (byte) ((buffer.get(dataTypeIndex) >> 4) & 0x0F); + int dataLenIndex; + + switch (dataType) { + case 3: + dataLenIndex = dataTypeIndex + 9; + break; + case 4: + dataLenIndex = dataTypeIndex + 1; + break; + default: + dataLenIndex = dataTypeIndex + 13; + } + //log.info("dataType:"+dataType); + if (buffer.size() < dataLenIndex + 2) return null; + int dataLen = buffer.getShort(dataLenIndex) & 0xFFFF; + int packetLength = dataLenIndex + 2 + dataLen; + + if (buffer.size() < packetLength) return null; + + // Check next packet header + if (buffer.size() >= packetLength + 4) { + byte[] nextHeader = new byte[4]; + for (int i = 0; i < 4; i++) { + nextHeader[i] = buffer.get(packetLength + i); + } + if (!Arrays.equals(nextHeader, HEAD1078)) { + //log.warn("Invalid data type: {}", dataType); + buffer.clear(); + return null; + } + } + + byte[] packetData = new byte[packetLength]; + buffer.sliceInto(packetData, packetLength); + + //printPacketInfo(packetData, version); + + return Packet.create(packetData); + } + + // 瑙f瀽16鐗堟湰鏁版嵁 + private Analyze parse16(byte[] data) { + Analyze analyze = new Analyze(); + analyze.setPt((byte) (data[5] & 0x7F)); + analyze.setSn((short) (((data[6] & 0xFF) << 8) | (data[7] & 0xFF))); + + StringBuilder simBuilder = new StringBuilder(); + for (int i = 0; i < 6; i++) { + simBuilder.append(nextBcd(data, 8 + i)); + } + analyze.setSim(simBuilder.toString()); + + analyze.setCh(data[14]); + analyze.setDt((byte) ((data[15] >> 4) & 0x0F)); + analyze.setFi((byte) (data[15] & 0x0F)); + + int dataLenIndex; + switch (analyze.getDt()) { + case 3: + analyze.setTimestamp(setTimestamp(data, 16)); + dataLenIndex = 24; + break; + case 4: + dataLenIndex = 16; + break; + default: + analyze.setTimestamp(setTimestamp(data, 16)); + analyze.setLastIInterval((short) (((data[24] & 0xFF) << 8) | (data[25] & 0xFF))); + analyze.setLastInterval((short) (((data[26] & 0xFF) << 8) | (data[27] & 0xFF))); + dataLenIndex = 28; + } + + analyze.setDataLen((short) (((data[dataLenIndex] & 0xFF) << 8) | (data[dataLenIndex + 1] & 0xFF))); + byte[] payload = new byte[analyze.getDataLen()]; + System.arraycopy(data, dataLenIndex + 2, payload, 0, payload.length); + analyze.setData(payload); + + return analyze; + } + + // 瑙f瀽19鐗堟湰鏁版嵁 + private Analyze parse19(byte[] data) { + Analyze analyze = new Analyze(); + analyze.setPt((byte) (data[5] & 0x7F)); + analyze.setSn((short) (((data[6] & 0xFF) << 8) | (data[7] & 0xFF))); + + StringBuilder simBuilder = new StringBuilder(); + for (int i = 0; i < 10; i++) { + simBuilder.append(nextBcd(data, 8 + i)); + } + analyze.setSim(simBuilder.toString()); + + analyze.setCh(data[18]); + analyze.setDt((byte) ((data[19] >> 4) & 0x0F)); + analyze.setFi((byte) (data[19] & 0x0F)); + + int dataLenIndex; + switch (analyze.getDt()) { + case 3: + analyze.setTimestamp(setTimestamp(data, 20)); + dataLenIndex = 28; + break; + case 4: + dataLenIndex = 20; + break; + default: + analyze.setTimestamp(setTimestamp(data, 20)); + analyze.setLastIInterval((short) (((data[28] & 0xFF) << 8) | (data[29] & 0xFF))); + analyze.setLastInterval((short) (((data[30] & 0xFF) << 8) | (data[31] & 0xFF))); + dataLenIndex = 32; + } + + analyze.setDataLen((short) (((data[dataLenIndex] & 0xFF) << 8) | (data[dataLenIndex + 1] & 0xFF))); + byte[] payload = new byte[analyze.getDataLen()]; + System.arraycopy(data, dataLenIndex + 2, payload, 0, payload.length); + analyze.setData(payload); + + return analyze; + } + + private long setTimestamp(byte[] data, int offset) { + long timestamp = 0; + for (int i = 0; i < 8; i++) { + timestamp = (timestamp << 8) | (data[offset + i] & 0xFF); + } + return timestamp; + } + + private String nextBcd(byte[] data, int offset) { + byte val = data[offset]; + int ch1 = (val >> 4) & 0x0F; + int ch2 = val & 0x0F; + return String.format("%d%d", ch1, ch2); + } + + + private void printPacketInfo(byte[] packet, ProtocolVersion version) { + // System.out.println("Raw Packet (Hex):\n" + ByteUtils.toHexString(packet)); + + System.out.println("\n=== JTT1078 " + ((version == ProtocolVersion.V2013) ? "2013" : "2019") + " Protocol " + + "Packet ==="); + System.out.println("Header: " + ByteUtils.toHexString(packet, 0, 4)); + + Analyze analyze = (version == ProtocolVersion.V2013) ? parse16(packet) : parse19(packet); + System.out.println("Payload Type: " + analyze.getPt()); + System.out.println("Sequence Number: " + analyze.getSn()); + System.out.println("SIM: " + analyze.getSim()); + System.out.println("Channel: " + analyze.getCh()); + String dataType; + switch (analyze.getDt()) { + case 0: + dataType = "Video I Frame"; + break; + case 1: + dataType = "Video P Frame"; + break; + case 2: + dataType = "Video B Frame"; + break; + case 3: + dataType = "Audio Frame"; + break; + case 4: + dataType = "Transparent Data"; + break; + default: + dataType = "Unknown"; + break; + } + System.out.println("Data Type: " + dataType); + String FrameInfo; + switch (analyze.getFi()) { + case 0: + FrameInfo = "Atomic Packet"; + break; + case 1: + FrameInfo = "First Packet"; + break; + case 2: + FrameInfo = "Last Packet"; + break; + case 3: + FrameInfo = "Middle Packet"; + break; + default: + FrameInfo = "Unknown (" + analyze.getFi() + ")"; + break; + } + System.out.println("Frame Info: " + FrameInfo); + + if (analyze.getTimestamp() != 0) { + Instant instant = Instant.ofEpochMilli(analyze.getTimestamp()); + ZonedDateTime beijingTime = instant.atZone(ZoneOffset.ofHours(8)); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + System.out.println("Timestamp: " + formatter.format(beijingTime)); + } + + if (analyze.getLastIInterval() != 0) { + System.out.println("Last I Frame Interval: " + analyze.getLastIInterval()); + } + if (analyze.getLastInterval() != 0) { + System.out.println("Last Frame Interval: " + analyze.getLastInterval()); + } + + System.out.println("Data Length: " + analyze.getDataLen()); + System.out.println("=================================\n"); + + } +} + diff --git a/src/main/java/cn/org/hentai/jtt1078/server/video/Jtt1078Handler.java b/src/main/java/cn/org/hentai/jtt1078/server/video/Jtt1078Handler.java new file mode 100644 index 0000000..bff5278 --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/server/video/Jtt1078Handler.java @@ -0,0 +1,150 @@ +package cn.org.hentai.jtt1078.server.video; + +import cn.org.hentai.jtt1078.publisher.Channel; +import cn.org.hentai.jtt1078.publisher.PublishManager; +import cn.org.hentai.jtt1078.server.Session; +import cn.org.hentai.jtt1078.server.SessionManager; +import cn.org.hentai.jtt1078.util.Packet; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.AttributeKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Created by matrixy on 2019/4/9. + */ +public class Jtt1078Handler extends SimpleChannelInboundHandler<Packet> +{ + static Logger logger = LoggerFactory.getLogger(Jtt1078Handler.class); + private static final AttributeKey<Session> SESSION_KEY = AttributeKey.valueOf("session-key"); + + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Packet packet) throws Exception { + io.netty.channel.Channel nettyChannel = ctx.channel(); + + // 1. 鍗忚鐗堟湰妫�娴� + boolean is2019Protocol = detectProtocolVersion(packet); + int simLength = is2019Protocol ? 10 : 6; + + // 2. 璇诲彇SIM鍗″彿鍜岄�氶亾鍙� + packet.seek(8); + StringBuilder simBuilder = new StringBuilder(); + for (int i = 0; i < simLength; i++) { + simBuilder.append(packet.nextBCD()); + } + String sim = simBuilder.toString(); + int channelPos = is2019Protocol ? 18 : 14; + packet.seek(channelPos); + int channel = packet.nextByte() & 0xff; + String tag = sim + "-" + channel; + // 3. 浼氳瘽绠$悊 + if (!SessionManager.contains(nettyChannel, "tag")) { + String publishKey = tag + ":video"; // 瑙嗛绔� + Channel chl = PublishManager.getInstance().open(publishKey); + SessionManager.set(nettyChannel, "tag", publishKey); + logger.info("start publishing: {} -> {}-{} (Protocol: {})", + Long.toHexString(chl.hashCode() & 0xffffffffL), + sim, channel, + is2019Protocol ? "2019" : "2016"); + } + + // 4. 鏁版嵁澶勭悊 + Integer sequence = SessionManager.get(nettyChannel, "video-sequence"); + if (sequence == null) sequence = 0; + + int dataTypePos = is2019Protocol ? 19 : 15; + packet.seek(dataTypePos); + int dataType = (packet.nextByte() >> 4) & 0x0f; + int pkType = packet.nextByte() & 0x0f; + + int baseOffset = is2019Protocol ? 32 : 28; + int lengthOffset = baseOffset; + if (dataType == 0x04) lengthOffset = baseOffset - 8 - 2 - 2; + else if (dataType == 0x03) lengthOffset = baseOffset - 4; + + int pt = packet.seek(5).nextByte() & 0x7f; + int timestampOffset = is2019Protocol ? 20 : 16; + + if (dataType == 0x00 || dataType == 0x01 || dataType == 0x02) { + if (pkType == 0 || pkType == 2) { + sequence += 1; + SessionManager.set(nettyChannel, "video-sequence", sequence); + } + long timestamp = packet.seek(timestampOffset).nextLong(); + byte[] videoData = packet.seek(lengthOffset + 2).nextBytes(); + PublishManager.getInstance().publishVideo(tag, sequence, timestamp, pt, videoData); + } + else if (dataType == 0x03) { + long timestamp = packet.seek(timestampOffset).nextLong(); + byte[] audioData = packet.seek(lengthOffset + 2).nextBytes(); + PublishManager.getInstance().publishAudio(tag, sequence, timestamp, pt, audioData); + } + } + + private boolean detectProtocolVersion(Packet packet) { + // 鏂规硶1锛氭鏌�6瀛楄妭SIM鍗″彿鍚庣殑濉厖 + packet.seek(8 + 6); // 2016鍗忚SIM鍗$粨鏉熶綅缃� + byte b14 = packet.nextByte(); + byte b15 = packet.nextByte(); + + // 濡傛灉鏄�2019鍗忚锛岃繖閲屽簲璇ユ槸0x00濉厖 + if ((b14 & 0xFF) == 0x00 && (b15 & 0xFF) == 0x00) { + return true; + } + + // 鏂规硶2锛氭鏌ユ暟鎹被鍨嬩綅缃殑鏈夋晥鎬� + packet.seek(15); + byte dtByte = packet.nextByte(); + int dataType = (dtByte >> 4) & 0x0F; + int frameType = dtByte & 0x0F; + + // 濡傛灉鏁版嵁绫诲瀷鎴栧抚绫诲瀷鏃犳晥锛屽彲鑳芥槸2019鍗忚 + if (dataType > 4 || frameType > 3) { + return true; + } + + // 榛樿杩斿洖2016鍗忚 + return false; + } + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception + { + super.channelInactive(ctx); + release(ctx.channel()); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception + { + // super.exceptionCaught(ctx, cause); + cause.printStackTrace(); + release(ctx.channel()); + ctx.close(); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) { + IdleStateEvent event = (IdleStateEvent) evt; + if (event.state() == IdleState.READER_IDLE) { + String tag = SessionManager.get(ctx.channel(), "tag"); + logger.info("read timeout: {}",tag); + release(ctx.channel()); + } + } + } + + private void release(io.netty.channel.Channel channel) + { + String tag = SessionManager.get(channel, "tag"); + if (tag != null) + { + logger.info("close netty channel: {}", tag); + PublishManager.getInstance().close(tag); + } + } +} diff --git a/src/main/java/cn/org/hentai/jtt1078/server/video/Jtt1078MessageDecoder.java b/src/main/java/cn/org/hentai/jtt1078/server/video/Jtt1078MessageDecoder.java new file mode 100644 index 0000000..20c630b --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/server/video/Jtt1078MessageDecoder.java @@ -0,0 +1,38 @@ +package cn.org.hentai.jtt1078.server.video; + +import cn.org.hentai.jtt1078.util.Packet; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import java.util.List; + +/** + * Created by matrixy on 2019/4/9. + */ +public class Jtt1078MessageDecoder extends ByteToMessageDecoder +{ + byte[] block = new byte[4096]; + + private Jtt1078Decoder decoder=new Jtt1078Decoder(); + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception + { + int length = in.readableBytes(); + for (int i = 0, k = (int)Math.ceil(length / 512f); i < k; i++) + { + int l = i < k - 1 ? 512 : length - (i * 512); + in.readBytes(block, 0, l); + + decoder.write(block, 0, l); + + while (true) + { + Packet p = decoder.decode(); + if (p == null) break; + + out.add(p); + } + } + } +} diff --git a/src/main/java/cn/org/hentai/jtt1078/test/AudioTest.java b/src/main/java/cn/org/hentai/jtt1078/test/AudioTest.java index 831e483..7d219f9 100644 --- a/src/main/java/cn/org/hentai/jtt1078/test/AudioTest.java +++ b/src/main/java/cn/org/hentai/jtt1078/test/AudioTest.java @@ -1,8 +1,7 @@ package cn.org.hentai.jtt1078.test; import cn.org.hentai.jtt1078.codec.G711Codec; -import cn.org.hentai.jtt1078.server.Jtt1078Decoder; -import cn.org.hentai.jtt1078.util.ByteUtils; +import cn.org.hentai.jtt1078.server.video.Jtt1078Decoder; import cn.org.hentai.jtt1078.util.Packet; import java.io.FileInputStream; diff --git a/src/main/java/cn/org/hentai/jtt1078/test/RTPGenerate.java b/src/main/java/cn/org/hentai/jtt1078/test/RTPGenerate.java index 92fae76..6c6abdf 100644 --- a/src/main/java/cn/org/hentai/jtt1078/test/RTPGenerate.java +++ b/src/main/java/cn/org/hentai/jtt1078/test/RTPGenerate.java @@ -1,9 +1,6 @@ package cn.org.hentai.jtt1078.test; -import cn.org.hentai.jtt1078.flv.FlvEncoder; -import cn.org.hentai.jtt1078.server.Jtt1078Decoder; -import cn.org.hentai.jtt1078.util.ByteHolder; -import cn.org.hentai.jtt1078.util.ByteUtils; +import cn.org.hentai.jtt1078.server.video.Jtt1078Decoder; import cn.org.hentai.jtt1078.util.Packet; import java.io.FileInputStream; diff --git a/src/main/java/cn/org/hentai/jtt1078/test/UnPack.java b/src/main/java/cn/org/hentai/jtt1078/test/UnPack.java index 36b75ce..a19e38a 100644 --- a/src/main/java/cn/org/hentai/jtt1078/test/UnPack.java +++ b/src/main/java/cn/org/hentai/jtt1078/test/UnPack.java @@ -1,6 +1,6 @@ package cn.org.hentai.jtt1078.test; -import cn.org.hentai.jtt1078.server.Jtt1078Decoder; +import cn.org.hentai.jtt1078.server.video.Jtt1078Decoder; import cn.org.hentai.jtt1078.util.Packet; import java.io.FileInputStream; diff --git a/src/main/java/cn/org/hentai/jtt1078/test/VideoPushTest.java b/src/main/java/cn/org/hentai/jtt1078/test/VideoPushTest.java index 818f4ce..411d345 100644 --- a/src/main/java/cn/org/hentai/jtt1078/test/VideoPushTest.java +++ b/src/main/java/cn/org/hentai/jtt1078/test/VideoPushTest.java @@ -12,10 +12,7 @@ { public static void main(String[] args) throws Exception { - // TODO setup Socket conn = new Socket("223.244.87.184", 1078); - // Socket conn = new Socket("47.104.204.210", 1078); - // Socket conn = new Socket("111.40.46.199", 7003); - Socket conn = new Socket("localhost", 7003); + Socket conn = new Socket("localhost", 1078); OutputStream os = conn.getOutputStream(); // InputStream fis = new FileInputStream("e:\\workspace\\enc_dec_audio\\streamax.bin"); diff --git a/src/main/java/cn/org/hentai/jtt1078/test/VideoServer.java b/src/main/java/cn/org/hentai/jtt1078/test/VideoServer.java index da08a8f..3557da5 100644 --- a/src/main/java/cn/org/hentai/jtt1078/test/VideoServer.java +++ b/src/main/java/cn/org/hentai/jtt1078/test/VideoServer.java @@ -1,6 +1,6 @@ package cn.org.hentai.jtt1078.test; -import cn.org.hentai.jtt1078.server.Jtt1078Decoder; +import cn.org.hentai.jtt1078.server.video.Jtt1078Decoder; import cn.org.hentai.jtt1078.util.Packet; import java.io.FileOutputStream; diff --git a/src/main/java/cn/org/hentai/jtt1078/util/AudioConverter.java b/src/main/java/cn/org/hentai/jtt1078/util/AudioConverter.java new file mode 100644 index 0000000..6b0981b --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/util/AudioConverter.java @@ -0,0 +1,70 @@ +package cn.org.hentai.jtt1078.util; + +import java.io.*; +import java.nio.file.*; + +public class AudioConverter { + + public static byte[] convertWebmToPcm(byte[] webmBytes) throws IOException, InterruptedException { + // 1. 鍒涘缓涓存椂鏂囦欢锛堣嚜鍔ㄥ垹闄わ級 + File inputFile = File.createTempFile("audio", ".webm"); + File outputFile = File.createTempFile("pcm", ".raw"); + + try { + // 2. 鍐欏叆杈撳叆鏂囦欢 + Files.write(inputFile.toPath(), webmBytes); + + // 3. 鏋勫缓 FFmpeg 鍛戒护 + ProcessBuilder pb = new ProcessBuilder( + "ffmpeg", + "-i", inputFile.getAbsolutePath(), // 杈撳叆鏂囦欢 + "-f", "s16le", // PCM 16-bit little-endian + "-acodec", "pcm_s16le", // 闊抽缂栫爜鍣� + "-ar", "8000", // 閲囨牱鐜� 8kHz + "-ac", "1", // 鍗曞0閬� + "-y", // 瑕嗙洊杈撳嚭鏂囦欢锛堝鏋滃瓨鍦級 + outputFile.getAbsolutePath() + ); + + // 4. 鎹曡幏閿欒娴侊紙鍏抽敭锛侊級 + pb.redirectErrorStream(true); // 鍚堝苟鏍囧噯杈撳嚭鍜岄敊璇祦 + Process process = pb.start(); + + // 5. 璇诲彇 FFmpeg 杈撳嚭锛堣皟璇曠敤锛� + try (BufferedReader reader = new BufferedReader( + new InputStreamReader(process.getInputStream()))) { + String line; + while ((line = reader.readLine()) != null) { + System.out.println("[FFmpeg] " + line); // 鎵撳嵃 FFmpeg 鏃ュ織 + } + } + + // 6. 绛夊緟杞崲瀹屾垚 + int exitCode = process.waitFor(); + if (exitCode != 0) { + throw new IOException("FFmpeg 杞崲澶辫触锛岄��鍑虹爜: " + exitCode); + } + + // 7. 璇诲彇 PCM 鏁版嵁 + return Files.readAllBytes(outputFile.toPath()); + } finally { + // 8. 娓呯悊涓存椂鏂囦欢 + inputFile.delete(); + outputFile.delete(); + } + } + public static void convertWebmToMp3(File webmFile, File mp3File) throws IOException, InterruptedException { + ProcessBuilder pb = new ProcessBuilder( + "ffmpeg", "-y", + "-i", webmFile.getAbsolutePath(), + "-ar", "44100", // 杞崲涓烘爣鍑� MP3 閲囨牱鐜� + "-ac", "2", // 绔嬩綋澹� + "-b:a", "128k", + mp3File.getAbsolutePath() + ); + + pb.redirectErrorStream(true); + Process process = pb.start(); + process.waitFor(); + } +} diff --git a/src/main/java/cn/org/hentai/jtt1078/util/AudioFileWriter.java b/src/main/java/cn/org/hentai/jtt1078/util/AudioFileWriter.java new file mode 100644 index 0000000..15f353e --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/util/AudioFileWriter.java @@ -0,0 +1,30 @@ +package cn.org.hentai.jtt1078.util; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; + +public class AudioFileWriter { + private FileOutputStream fos; + private final File outputFile; + + public AudioFileWriter(String filename) throws IOException { + outputFile = new File(filename); + fos = new FileOutputStream(outputFile); + } + + public void append(byte[] data) throws IOException { + fos.write(data); + fos.flush(); + } + + public void close() throws IOException { + if (fos != null) { + fos.close(); + } + } + + public File getFile() { + return outputFile; + } +} diff --git a/src/main/java/cn/org/hentai/jtt1078/util/ByteHolder.java b/src/main/java/cn/org/hentai/jtt1078/util/ByteHolder.java index c7dd166..e72bf96 100644 --- a/src/main/java/cn/org/hentai/jtt1078/util/ByteHolder.java +++ b/src/main/java/cn/org/hentai/jtt1078/util/ByteHolder.java @@ -13,8 +13,6 @@ public ByteHolder(int bufferSize) { - System.out.print("ByteHolder(bufferSize:"+bufferSize+")"); - this.buffer = new byte[bufferSize]; } @@ -58,7 +56,6 @@ public void sliceInto(byte[] dest, int length) { - //System.out.println(); System.arraycopy(this.buffer, 0, dest, 0, length); // 寰�鍓嶆尓length涓綅 System.arraycopy(this.buffer, length, this.buffer, 0, this.size - length); @@ -96,4 +93,9 @@ int l = this.buffer[position + 1] & 0xff; return ((h << 8) | l) & 0xffff; } + // 鉁� 娣诲姞杩欎釜鏂规硶 + public byte[] peek(int length) { + if (size < length) return null; + return Arrays.copyOfRange(this.buffer, 0, length); + } } \ No newline at end of file diff --git a/src/main/java/cn/org/hentai/jtt1078/util/ByteUtils.java b/src/main/java/cn/org/hentai/jtt1078/util/ByteUtils.java index 52214aa..58171bb 100644 --- a/src/main/java/cn/org/hentai/jtt1078/util/ByteUtils.java +++ b/src/main/java/cn/org/hentai/jtt1078/util/ByteUtils.java @@ -1,18 +1,14 @@ package cn.org.hentai.jtt1078.util; +import cn.org.hentai.jtt1078.flv.AudioTag; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + /** * Created by matrixy on 2017/8/22. */ public final class ByteUtils { - public static String toHexString(byte[] bytes, int offset, int length) { - StringBuilder sb = new StringBuilder(length * 3); - for (int i = offset; i < offset + length && i < bytes.length; i++) { - sb.append(String.format("%02X ", bytes[i])); - } - return sb.toString().trim(); - } - public static byte[] parse(String hexString) { String[] hexes = hexString.split(" "); @@ -201,4 +197,95 @@ } return dst; } + + public static String toHexString(byte[] bytes, int offset, int length) { + StringBuilder sb = new StringBuilder(length * 3); + for (int i = offset; i < offset + length && i < bytes.length; i++) { + sb.append(String.format("%02X ", bytes[i])); + } + return sb.toString().trim(); + } + + // 鍙�夛細鎵撳嵃鏁翠釜鏁扮粍 + public static String toHexString(byte[] bytes) { + return toHexString(bytes, 0, bytes.length); + } + + public static int getShort(byte[] bytes, int offset) { + return ((bytes[offset] & 0xFF) << 8) | (bytes[offset + 1] & 0xFF); + } + public static long getLong(byte[] bytes, int offset) { + return ((long)(bytes[offset] & 0xFF) << 56) | + ((long)(bytes[offset + 1] & 0xFF) << 48) | + ((long)(bytes[offset + 2] & 0xFF) << 40) | + ((long)(bytes[offset + 3] & 0xFF) << 32) | + ((long)(bytes[offset + 4] & 0xFF) << 24) | + ((long)(bytes[offset + 5] & 0xFF) << 16) | + ((long)(bytes[offset + 6] & 0xFF) << 8) | + ((long)(bytes[offset + 7] & 0xFF)); + } + + /** + * Flv 闊抽灏佽缂栫爜 + * + * @param audioTag 闊抽tag + * @return 瀛楄妭娴� + * @author xingkong + * @date 2021-09-07 17:02 + */ + public static ByteBuf encode(AudioTag audioTag) throws Exception { + ByteBuf buffer = Unpooled.buffer(); + if (audioTag == null) { + return buffer; + } + //----------------------tag header begin------- + buffer.writeByte(8); + buffer.writeMedium(audioTag.getTagDataSize()); + buffer.writeMedium(audioTag.getOffSetTimestamp() & 0xFFFFFF); + buffer.writeByte(audioTag.getOffSetTimestamp() >> 24); + buffer.writeMedium(audioTag.getStreamId()); + //---------------------tag header length 11--------- + //---------------------tag header end---------------- + byte formatAndRateAndSize = (byte) (audioTag.getFormat() << 4 | audioTag.getRate() << 2 | audioTag.getSize() << 1 | audioTag.getType()); + //-------------data begin------- + buffer.writeByte(formatAndRateAndSize); + buffer.writeBytes(audioTag.getData()); + //-------------data end ------- + //搴旇绛変簬11+tagDataSize + buffer.writeInt(buffer.writerIndex()); + return buffer; + } + + /** + * 璇诲彇bytebuf鍓╀笅鐨勫瓧鑺� + * + * @param msg 鍙鍙朾ytebuf + * @return 鍓╀笅鎵�鏈夌殑瀛楄妭 + * @author xingkong + * @date 2021-09-07 16:15 + */ + public static byte[] readReadableBytes(ByteBuf msg) { + byte[] content = new byte[msg.readableBytes()]; + msg.readBytes(content); + msg.release(); + return content; + } + /** + * 杞崲鏁版嵁 杞崲涓烘垜浠渶瑕佺殑鎶ユ枃褰㈠紡 + * + * @param data 甯﹁浆鎹㈢殑鎶ユ枃 + * @return 杞崲濂界殑鎶ユ枃 + * @author xingkong + * @date 2021/9/8 + */ + public static byte[] make(byte[] data) { + ByteBuf buffer = Unpooled.buffer(); + buffer.writeBytes(String.format("%x\r\n", data.length).getBytes()); + buffer.writeBytes(data); + buffer.writeByte((byte) '\r'); + buffer.writeByte((byte) '\n'); + return readReadableBytes(buffer); + } + + } diff --git a/src/main/java/cn/org/hentai/jtt1078/util/G711Util.java b/src/main/java/cn/org/hentai/jtt1078/util/G711Util.java new file mode 100644 index 0000000..f3a56d5 --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/util/G711Util.java @@ -0,0 +1,33 @@ +package cn.org.hentai.jtt1078.util; + +public class G711Util { + + // 浼樺寲鐨凣.711A瑙g爜鏂规硶 + public static byte[] decodeG711AFast(byte[] alawData) { + byte[] pcmData = new byte[alawData.length * 2]; + for (int i = 0; i < alawData.length; i++) { + short pcm = ALAW_TABLE[alawData[i] & 0xFF]; + pcmData[i * 2] = (byte) (pcm & 0xFF); + pcmData[i * 2 + 1] = (byte) (pcm >> 8); + } + return pcmData; + } + + // A-law瑙g爜鏌ユ壘琛� + private static final short[] ALAW_TABLE = new short[256]; + + static { + for (int i = 0; i < 256; i++) { + int alaw = i ^ 0x55; + int sign = alaw & 0x80; + int exponent = (alaw & 0x70) >> 4; + int data = alaw & 0x0F; + + data = (data << 4) + 8; + if (exponent != 0) data += 0x100; + if (exponent > 1) data <<= (exponent - 1); + + ALAW_TABLE[i] = (short) (sign == 0 ? data : -data); + } + } +} diff --git a/src/main/java/cn/org/hentai/jtt1078/util/Packet.java b/src/main/java/cn/org/hentai/jtt1078/util/Packet.java index e235eab..16d4c4d 100644 --- a/src/main/java/cn/org/hentai/jtt1078/util/Packet.java +++ b/src/main/java/cn/org/hentai/jtt1078/util/Packet.java @@ -12,7 +12,7 @@ int maxSize = 0; public byte[] data; - protected Packet() + public Packet() { // do nothing here.. } diff --git a/src/main/java/cn/org/hentai/jtt1078/util/TimeUtils.java b/src/main/java/cn/org/hentai/jtt1078/util/TimeUtils.java new file mode 100644 index 0000000..cbaec82 --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/util/TimeUtils.java @@ -0,0 +1,46 @@ +package cn.org.hentai.jtt1078.util; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; + +public class TimeUtils { + + /** + * 灏�8瀛楄妭鏃堕棿鎴冲瓧鑺傛暟缁勶紙澶х搴忥級杞垚鏍煎紡鍖栨椂闂村瓧绗︿覆锛圲TC鏃跺尯锛� + * @param bytes 8瀛楄妭鏁扮粍锛岃〃绀鸿嚜Unix绾厓浠ユ潵鐨勬绉掓暟 + * @return 鏍煎紡鍖栨椂闂村瓧绗︿覆锛屼緥濡傦細"2025-07-30 12:34:56" + */ + public static String bytes8ToTimeString(byte[] bytes) { + if (bytes == null || bytes.length != 8) { + throw new IllegalArgumentException("瀛楄妭鏁扮粍蹇呴』涓�8瀛楄妭闀垮害"); + } + // 榛樿澶х瀛楄妭搴忚鍙杔ong + long timestampMillis = ByteBuffer.wrap(bytes).getLong(); + + Instant instant = Instant.ofEpochMilli(timestampMillis); + ZonedDateTime utcTime = instant.atZone(ZoneOffset.UTC); + + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneOffset.UTC); + return formatter.format(utcTime); + } + + public static long bytes8ToLong(byte[] bytes) { + if (bytes == null || bytes.length != 8) throw new IllegalArgumentException("Invalid byte array"); + return ((long)(bytes[0] & 0xFF) << 56) | + ((long)(bytes[1] & 0xFF) << 48) | + ((long)(bytes[2] & 0xFF) << 40) | + ((long)(bytes[3] & 0xFF) << 32) | + ((long)(bytes[4] & 0xFF) << 24) | + ((long)(bytes[5] & 0xFF) << 16) | + ((long)(bytes[6] & 0xFF) << 8) | + ((long)(bytes[7] & 0xFF)); + } + + public static void main(String[] args) { + // 绀轰緥8瀛楄妭鏃堕棿鎴� + byte[] tsBytes = new byte[]{0, 0, 1, 122, 68, 95, -54, 0}; // 渚嬪瓙 + System.out.println(bytes8ToTimeString(tsBytes)); + } +} diff --git a/src/main/java/cn/org/hentai/jtt1078/websocket/CustomHandshakeInterceptor.java b/src/main/java/cn/org/hentai/jtt1078/websocket/CustomHandshakeInterceptor.java new file mode 100644 index 0000000..ca2dbcb --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/websocket/CustomHandshakeInterceptor.java @@ -0,0 +1,30 @@ +package cn.org.hentai.jtt1078.websocket; + +import org.springframework.http.server.ServerHttpRequest; +import org.springframework.http.server.ServerHttpResponse; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.server.HandshakeInterceptor; + +import java.util.Map; + +public class CustomHandshakeInterceptor implements HandshakeInterceptor { + + @Override + public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, + WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { + //浠庤矾寰勪腑鑾峰彇userId + String path = request.getURI().getPath(); + String[] segments = path.split("/"); + if (segments.length > 2) { + String sim = segments[segments.length - 1]; + attributes.put("sim", sim); + } + + return true; + } + + @Override + public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, + WebSocketHandler wsHandler, Exception ex) { + } +} \ No newline at end of file diff --git a/src/main/java/cn/org/hentai/jtt1078/websocket/MyWebSocketHandler.java b/src/main/java/cn/org/hentai/jtt1078/websocket/MyWebSocketHandler.java new file mode 100644 index 0000000..c91a415 --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/websocket/MyWebSocketHandler.java @@ -0,0 +1,84 @@ +package cn.org.hentai.jtt1078.websocket; + +import cn.org.hentai.jtt1078.entity.enums.ProtocolVersion; +import cn.org.hentai.jtt1078.server.audio.Jt1078AudioSenderService; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.BinaryMessage; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.AbstractWebSocketHandler; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; + +@Component +public class MyWebSocketHandler extends AbstractWebSocketHandler { + + private static final Map<String, List<WebSocketSession>> simSessionMap = new ConcurrentHashMap<>(); + + @Override + protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { + System.out.println("Received text: " + message.getPayload()); + session.sendMessage(new TextMessage("Echo: " + message.getPayload())); + } + + @Override + protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) { + String sim = (String) session.getAttributes().get("sim"); + ByteBuffer payload = message.getPayload(); + // System.out.println("Received binary message, size: " + payload.remaining()); + ProtocolVersion protocolVersion = getProtocolVersion(sim); + // 杩欓噷澶勭悊浜岃繘鍒舵暟鎹� + Jt1078AudioSenderService.sendAudio(sim, 1, payload.array(), protocolVersion); + } + + //鏍规嵁sim鍗¢暱搴﹀垽鏂崗璁増鏈� + private ProtocolVersion getProtocolVersion(String sim) { + + ProtocolVersion version; + if (sim != null && sim.length() == 20) { // 10瀛楄妭BCD缂栫爜 鈫� 20浣嶅瓧绗︿覆 + version = ProtocolVersion.V2019; + } else if (sim != null && sim.length() == 12) { // 6瀛楄妭BCD缂栫爜 鈫� 12浣嶅瓧绗︿覆 + version = ProtocolVersion.V2013; + } else { + version = ProtocolVersion.UNKNOWN; // 鏃犳硶鍒ゆ柇 + System.err.println("Invalid SIM length, unable to determine JT1078 version"); + } + return version; + } + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + System.out.println("Connected: " + session.getId()); + String sim = (String) session.getAttributes().get("sim"); + simSessionMap.computeIfAbsent(sim, k -> new CopyOnWriteArrayList<>()).add(session); + System.out.println("缁戝畾SIM: " + sim + " -> " + session.getId()); + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { + System.out.println("Disconnected: " + session.getId()); + simSessionMap.values().forEach(list -> list.remove(session)); + } + + // 缁欏搴� SIM 鐨勬墍鏈夎繛鎺ュ箍鎾煶棰戞暟鎹紙G.711A鍘熷甯э級 + public static void sendAudioToSim(String sim, byte[] audioData) { + List<WebSocketSession> sessions = simSessionMap.get(sim); + if (sessions != null) { + for (WebSocketSession s : sessions) { + if (s.isOpen()) { + try { + s.sendMessage(new BinaryMessage(audioData)); + } catch (IOException e) { + System.out.println("鍙戦�侀煶棰戞暟鎹け璐�: " + e.getMessage()); + } + } + } + } + } +} \ No newline at end of file diff --git a/src/main/java/cn/org/hentai/jtt1078/websocket/WebSocketConfig.java b/src/main/java/cn/org/hentai/jtt1078/websocket/WebSocketConfig.java new file mode 100644 index 0000000..84e042f --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/websocket/WebSocketConfig.java @@ -0,0 +1,26 @@ +package cn.org.hentai.jtt1078.websocket; + +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.config.annotation.EnableWebSocket; +import org.springframework.web.socket.config.annotation.WebSocketConfigurer; +import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; + +import javax.annotation.Resource; + +@Configuration +@EnableWebSocket +public class WebSocketConfig implements WebSocketConfigurer { + + @Resource + private MyWebSocketHandler myWebSocketHandler; + + @Override + public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { + //娣诲姞myWebSocketHandler娑堟伅澶勭悊瀵硅薄浠ュ強websocket杩炴帴鍦板潃 + registry.addHandler(myWebSocketHandler, "/ws/**") + //璁剧疆鍏佽璺ㄥ煙璁块棶 + .setAllowedOrigins("*") + //娣诲姞鎷︽埅鍣ㄥ彲瀹炵幇鐢ㄦ埛閾炬帴鍓嶈繘琛屾潈闄愭牎楠岀瓑鎿嶄綔 + .addInterceptors(new CustomHandshakeInterceptor()); + } +} \ No newline at end of file diff --git a/src/main/resources/app.properties b/src/main/resources/app.properties index e77c267..6da947c 100644 --- a/src/main/resources/app.properties +++ b/src/main/resources/app.properties @@ -1,19 +1,11 @@ - -#1078 -server.port = 7003 -#3333 server.http.port = 7004 -server.backlog = 1024 - # ffmpeg鍙墽琛屾枃浠惰矾寰勶紝鍙互鐣欑┖ ffmpeg.path = E:/jtt1078/ffmpeg/bin/ffmpeg.exe -//ffmpeg.path = D:/vehicle/ffmpeg/bin/ffmpeg.exe # 閰嶇疆rtmp鍦板潃灏嗗湪缁堢鍙戦�丷TP娑堟伅鍖呮椂锛岄澶栫殑鍚慠TMP鏈嶅姟鍣ㄦ帹娴� # TAG鐨勫舰寮忓氨鏄疭IM-CHANNEL锛屽13800138999-2 # 濡傛灉鐣欑┖灏嗕笉鍚慠TMP鏈嶅姟鍣ㄦ帹娴� - rtmp.url = -#rtmp.url = rtmp://47.104.204.210/live/{TAG} +# rtmp.url = rtmp://192.168.0.2/live/{TAG} # 璁剧疆涓簅n鏃讹紝鎺у埗鍙板皢杈撳嚭ffmpeg鐨勮緭鍑� -debug.mode = on \ No newline at end of file +debug.mode = off \ No newline at end of file diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..7baff91 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,10 @@ +server: + port: 8080 + +jt1078: + video: + port: 7003 # 瑙嗛闊抽鏁版嵁鎺ュ叆 + http: + port: 7004 # HTTP Web 璁块棶绔彛 + talk: + port: 8003 # 璇煶瀵硅绔彛 diff --git a/src/main/resources/audio.html b/src/main/resources/audio.html index 9836595..e5a1b6c 100644 --- a/src/main/resources/audio.html +++ b/src/main/resources/audio.html @@ -6,8 +6,7 @@ <button>Play</button> </body> </html> -<!--<script src="//cdn.bootcss.com/jquery/3.4.1/jquery.min.js" type="text/javascript"></script>--> -<script src="https://code.jquery.com/jquery-3.7.1.min.js" integrity="sha256-/JqT3SQfawRcv/BIHPThkBvs0OEvtFFmqPF/lYI/Cxo=" crossorigin="anonymous"></script> +<script src="//cdn.bootcss.com/jquery/3.4.1/jquery.min.js" type="text/javascript"></script> <script type="text/javascript"> var streamOffset = 0; diff --git a/src/main/resources/multimedia.html b/src/main/resources/multimedia.html index 8b3de6c..313aadc 100644 --- a/src/main/resources/multimedia.html +++ b/src/main/resources/multimedia.html @@ -2,11 +2,10 @@ <head> <title>HTTP-Flv Test</title> </head> -<!--<script src="//cdn.bootcss.com/jquery/3.4.1/jquery.min.js" type="text/javascript"></script> -<script src="//cdn.bootcss.com/flv.js/1.5.0/flv.min.js" type="text/javascript"></script>--> - -<script src="https://code.jquery.com/jquery-3.7.1.min.js" integrity="sha256-/JqT3SQfawRcv/BIHPThkBvs0OEvtFFmqPF/lYI/Cxo=" crossorigin="anonymous"></script> -<script src="https://cdnjs.cloudflare.com/ajax/libs/flv.js/1.5.0/flv.js" integrity="sha512-3o5c2VekAg9ZZgcmddbSufUCJjqqY7uypQUa2JeCCgLjVKA1KOfqrMMGwbgP9tCszbZXhG7agevYBh2sm3I0JA==" crossorigin="anonymous" referrerpolicy="no-referrer"></script> +<!--<script src="//cdn.bootcss.com/jquery/3.4.1/jquery.min.js" type="text/javascript"></script>--> +<!--<script src="//cdn.bootcss.com/flv.js/1.5.0/flv.min.js" type="text/javascript"></script>--> +<script src="//code.jquery.com/jquery-3.4.1.min.js" type="text/javascript"></script> +<script src="//cdn.jsdelivr.net/npm/flv.js@1.5.0/dist/flv.min.js" type="text/javascript"></script> <body style="background-color: #666666"> <div id="xxoo" style="background-color: #333333;border-radius: 10px; overflow: hidden; width: 400px; height: 300px;"></div> <input id="tag"> diff --git a/src/main/resources/video.html b/src/main/resources/video.html index 8b4e24c..93d69d9 100644 --- a/src/main/resources/video.html +++ b/src/main/resources/video.html @@ -2,7 +2,7 @@ <head> <title>HTTP-Flv Test</title> </head> -<script type="text/javascript" src="js/flv.min.js"></script> +<script type="text/javascript" src="//cdn.bootcss.com/flv.js/1.5.0/flv.min.js"></script> <body> <video id="xxoo" autoplay></video> <script type="text/javascript"> -- Gitblit v1.9.3