From 60211f59d2c85053533ed151adb2bdc5348dd342 Mon Sep 17 00:00:00 2001
From: liusuyi <1951119284@qq.com>
Date: 星期四, 01 八月 2024 09:51:12 +0800
Subject: [PATCH] 修改:会话消息独立线程池;线程池参数统一设置

---
 ard-work/src/main/java/com/ruoyi/utils/websocket/util/WebSocketUtils.java         |    2 +-
 ard-work/src/main/java/com/ruoyi/alarm/config/AsyncConfiguration.java             |   28 ++++++++++++++++++++++++----
 ard-work/src/main/java/com/ruoyi/call/listener/MsgListener.java                   |    2 +-
 ard-work/src/main/java/com/ruoyi/call/service/impl/ArdCallHistoryServiceImpl.java |    2 ++
 ard-work/src/main/java/com/ruoyi/utils/mqtt/MqttConsumerCallback.java             |    4 +++-
 5 files changed, 31 insertions(+), 7 deletions(-)

diff --git a/ard-work/src/main/java/com/ruoyi/alarm/config/AsyncConfiguration.java b/ard-work/src/main/java/com/ruoyi/alarm/config/AsyncConfiguration.java
index 7ce10af..95838d8 100644
--- a/ard-work/src/main/java/com/ruoyi/alarm/config/AsyncConfiguration.java
+++ b/ard-work/src/main/java/com/ruoyi/alarm/config/AsyncConfiguration.java
@@ -76,11 +76,11 @@
     public Executor globalExecutor(){
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
         //閰嶇疆鏍稿績绾跨▼鏁�
-        executor.setCorePoolSize(10);
+        executor.setCorePoolSize(corePoolSize);
         //閰嶇疆鏈�澶х嚎绋嬫暟
-        executor.setMaxPoolSize(15);
+        executor.setMaxPoolSize(maxPoolSize);
         //閰嶇疆闃熷垪澶у皬
-        executor.setQueueCapacity(25);
+        executor.setQueueCapacity(queueCapacity);
         //绾跨▼鐨勫悕绉板墠缂�
         executor.setThreadNamePrefix("globalExecutor-");
         //绾跨▼娲昏穬鏃堕棿锛堢锛�
@@ -93,5 +93,25 @@
         executor.initialize();
         return executor;
     }
-
+    @Bean("msgExecutor")
+    public Executor msgExecutor(){
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        //閰嶇疆鏍稿績绾跨▼鏁�
+        executor.setCorePoolSize(corePoolSize);
+        //閰嶇疆鏈�澶х嚎绋嬫暟
+        executor.setMaxPoolSize(maxPoolSize);
+        //閰嶇疆闃熷垪澶у皬
+        executor.setQueueCapacity(queueCapacity);
+        //绾跨▼鐨勫悕绉板墠缂�
+        executor.setThreadNamePrefix("msgExecutor-");
+        //绾跨▼娲昏穬鏃堕棿锛堢锛�
+        executor.setKeepAliveSeconds(keepAliveSeconds);
+        //绛夊緟鎵�鏈変换鍔$粨鏉熷悗鍐嶅叧闂嚎绋嬫睜
+        executor.setWaitForTasksToCompleteOnShutdown(true);
+        //璁剧疆鎷掔粷绛栫暐
+        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+        //鎵ц鍒濆鍖�
+        executor.initialize();
+        return executor;
+    }
 }
diff --git a/ard-work/src/main/java/com/ruoyi/call/listener/MsgListener.java b/ard-work/src/main/java/com/ruoyi/call/listener/MsgListener.java
index 552c9d2..ebbecfa 100644
--- a/ard-work/src/main/java/com/ruoyi/call/listener/MsgListener.java
+++ b/ard-work/src/main/java/com/ruoyi/call/listener/MsgListener.java
@@ -26,7 +26,7 @@
 @Slf4j(topic = "msgListener")
 public class MsgListener {
 
-    @Async("globalExecutor")
+    @Async("msgExecutor")
     @EventListener(MessageEvent.class)
     public void ArdCallHistoryEventListener(MessageEvent messageEvent) {
         log.debug("鐩戝惉鍒颁細璇濇秷鎭簨浠�:"+messageEvent.getArdCallHistory().getContent());
diff --git a/ard-work/src/main/java/com/ruoyi/call/service/impl/ArdCallHistoryServiceImpl.java b/ard-work/src/main/java/com/ruoyi/call/service/impl/ArdCallHistoryServiceImpl.java
index 979a542..b28254c 100644
--- a/ard-work/src/main/java/com/ruoyi/call/service/impl/ArdCallHistoryServiceImpl.java
+++ b/ard-work/src/main/java/com/ruoyi/call/service/impl/ArdCallHistoryServiceImpl.java
@@ -16,6 +16,7 @@
 import com.ruoyi.common.utils.uuid.IdUtils;
 import com.ruoyi.system.service.ISysUserService;
 import com.ruoyi.utils.websocket.util.WebSocketUtils;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.ApplicationEventPublisher;
 import org.springframework.stereotype.Service;
@@ -42,6 +43,7 @@
  * @date 2024-07-03
  */
 @Service
+@Slf4j(topic = "msgListener")
 public class ArdCallHistoryServiceImpl implements IArdCallHistoryService {
     @Resource
     private ArdCallHistoryMapper ardCallHistoryMapper;
diff --git a/ard-work/src/main/java/com/ruoyi/utils/mqtt/MqttConsumerCallback.java b/ard-work/src/main/java/com/ruoyi/utils/mqtt/MqttConsumerCallback.java
index f89b53d..566696f 100644
--- a/ard-work/src/main/java/com/ruoyi/utils/mqtt/MqttConsumerCallback.java
+++ b/ard-work/src/main/java/com/ruoyi/utils/mqtt/MqttConsumerCallback.java
@@ -1,5 +1,6 @@
 package com.ruoyi.utils.mqtt;
 
+import com.ruoyi.alarm.global.service.IGlobalAlarmService;
 import com.ruoyi.alarm.global.service.impl.GlobalAlarmServiceImpl;
 import com.ruoyi.alarm.radar.service.ArdRadarService;
 import com.ruoyi.common.utils.spring.SpringUtils;
@@ -73,8 +74,9 @@
             // subscribe鍚庡緱鍒扮殑娑堟伅浼氭墽琛屽埌杩欓噷闈�
             log.debug("鎺ユ敹娑堟伅 銆愪富棰樸��:" + topic + " 銆愬唴瀹广��:" + new String(message.getPayload(), StandardCharsets.UTF_8));
             //杩涜涓氬姟澶勭悊(鎺ユ敹鎶ヨ鏁版嵁)
-            GlobalAlarmServiceImpl globalAlarmService = SpringUtils.getBean(GlobalAlarmServiceImpl.class);
+            IGlobalAlarmService globalAlarmService = SpringUtils.getBean(IGlobalAlarmService.class);
             globalAlarmService.receiveAlarm(topic, new String(message.getPayload(), StandardCharsets.UTF_8));
+
             if (topic.equals("minioEvent"))
             {
                 IStorageMinioEventService storageMinioEventService = SpringUtils.getBean(IStorageMinioEventService.class);
diff --git a/ard-work/src/main/java/com/ruoyi/utils/websocket/util/WebSocketUtils.java b/ard-work/src/main/java/com/ruoyi/utils/websocket/util/WebSocketUtils.java
index 4583f87..18606e1 100644
--- a/ard-work/src/main/java/com/ruoyi/utils/websocket/util/WebSocketUtils.java
+++ b/ard-work/src/main/java/com/ruoyi/utils/websocket/util/WebSocketUtils.java
@@ -23,7 +23,7 @@
 public final class WebSocketUtils {
 
     // 瀛樺偍 websocket session
-    public static final Map<String, Session> ONLINE_USER_SESSIONS = new ConcurrentHashMap<>();
+    public static final ConcurrentMap<String, Session> ONLINE_USER_SESSIONS = new ConcurrentHashMap<>();
     //瀛樺偍鎴块棿
     public static final ConcurrentHashMap<String, Set<String>> ROOM_USER_SET = new ConcurrentHashMap<>();
 

--
Gitblit v1.9.3