From 60211f59d2c85053533ed151adb2bdc5348dd342 Mon Sep 17 00:00:00 2001
From: liusuyi <1951119284@qq.com>
Date: 星期四, 01 八月 2024 09:51:12 +0800
Subject: [PATCH] 修改:会话消息独立线程池;线程池参数统一设置
---
ard-work/src/main/java/com/ruoyi/utils/websocket/util/WebSocketUtils.java | 2 +-
ard-work/src/main/java/com/ruoyi/alarm/config/AsyncConfiguration.java | 28 ++++++++++++++++++++++++----
ard-work/src/main/java/com/ruoyi/call/listener/MsgListener.java | 2 +-
ard-work/src/main/java/com/ruoyi/call/service/impl/ArdCallHistoryServiceImpl.java | 2 ++
ard-work/src/main/java/com/ruoyi/utils/mqtt/MqttConsumerCallback.java | 4 +++-
5 files changed, 31 insertions(+), 7 deletions(-)
diff --git a/ard-work/src/main/java/com/ruoyi/alarm/config/AsyncConfiguration.java b/ard-work/src/main/java/com/ruoyi/alarm/config/AsyncConfiguration.java
index 7ce10af..95838d8 100644
--- a/ard-work/src/main/java/com/ruoyi/alarm/config/AsyncConfiguration.java
+++ b/ard-work/src/main/java/com/ruoyi/alarm/config/AsyncConfiguration.java
@@ -76,11 +76,11 @@
public Executor globalExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//閰嶇疆鏍稿績绾跨▼鏁�
- executor.setCorePoolSize(10);
+ executor.setCorePoolSize(corePoolSize);
//閰嶇疆鏈�澶х嚎绋嬫暟
- executor.setMaxPoolSize(15);
+ executor.setMaxPoolSize(maxPoolSize);
//閰嶇疆闃熷垪澶у皬
- executor.setQueueCapacity(25);
+ executor.setQueueCapacity(queueCapacity);
//绾跨▼鐨勫悕绉板墠缂�
executor.setThreadNamePrefix("globalExecutor-");
//绾跨▼娲昏穬鏃堕棿锛堢锛�
@@ -93,5 +93,25 @@
executor.initialize();
return executor;
}
-
+ @Bean("msgExecutor")
+ public Executor msgExecutor(){
+ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+ //閰嶇疆鏍稿績绾跨▼鏁�
+ executor.setCorePoolSize(corePoolSize);
+ //閰嶇疆鏈�澶х嚎绋嬫暟
+ executor.setMaxPoolSize(maxPoolSize);
+ //閰嶇疆闃熷垪澶у皬
+ executor.setQueueCapacity(queueCapacity);
+ //绾跨▼鐨勫悕绉板墠缂�
+ executor.setThreadNamePrefix("msgExecutor-");
+ //绾跨▼娲昏穬鏃堕棿锛堢锛�
+ executor.setKeepAliveSeconds(keepAliveSeconds);
+ //绛夊緟鎵�鏈変换鍔$粨鏉熷悗鍐嶅叧闂嚎绋嬫睜
+ executor.setWaitForTasksToCompleteOnShutdown(true);
+ //璁剧疆鎷掔粷绛栫暐
+ executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+ //鎵ц鍒濆鍖�
+ executor.initialize();
+ return executor;
+ }
}
diff --git a/ard-work/src/main/java/com/ruoyi/call/listener/MsgListener.java b/ard-work/src/main/java/com/ruoyi/call/listener/MsgListener.java
index 552c9d2..ebbecfa 100644
--- a/ard-work/src/main/java/com/ruoyi/call/listener/MsgListener.java
+++ b/ard-work/src/main/java/com/ruoyi/call/listener/MsgListener.java
@@ -26,7 +26,7 @@
@Slf4j(topic = "msgListener")
public class MsgListener {
- @Async("globalExecutor")
+ @Async("msgExecutor")
@EventListener(MessageEvent.class)
public void ArdCallHistoryEventListener(MessageEvent messageEvent) {
log.debug("鐩戝惉鍒颁細璇濇秷鎭簨浠�:"+messageEvent.getArdCallHistory().getContent());
diff --git a/ard-work/src/main/java/com/ruoyi/call/service/impl/ArdCallHistoryServiceImpl.java b/ard-work/src/main/java/com/ruoyi/call/service/impl/ArdCallHistoryServiceImpl.java
index 979a542..b28254c 100644
--- a/ard-work/src/main/java/com/ruoyi/call/service/impl/ArdCallHistoryServiceImpl.java
+++ b/ard-work/src/main/java/com/ruoyi/call/service/impl/ArdCallHistoryServiceImpl.java
@@ -16,6 +16,7 @@
import com.ruoyi.common.utils.uuid.IdUtils;
import com.ruoyi.system.service.ISysUserService;
import com.ruoyi.utils.websocket.util.WebSocketUtils;
+import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
@@ -42,6 +43,7 @@
* @date 2024-07-03
*/
@Service
+@Slf4j(topic = "msgListener")
public class ArdCallHistoryServiceImpl implements IArdCallHistoryService {
@Resource
private ArdCallHistoryMapper ardCallHistoryMapper;
diff --git a/ard-work/src/main/java/com/ruoyi/utils/mqtt/MqttConsumerCallback.java b/ard-work/src/main/java/com/ruoyi/utils/mqtt/MqttConsumerCallback.java
index f89b53d..566696f 100644
--- a/ard-work/src/main/java/com/ruoyi/utils/mqtt/MqttConsumerCallback.java
+++ b/ard-work/src/main/java/com/ruoyi/utils/mqtt/MqttConsumerCallback.java
@@ -1,5 +1,6 @@
package com.ruoyi.utils.mqtt;
+import com.ruoyi.alarm.global.service.IGlobalAlarmService;
import com.ruoyi.alarm.global.service.impl.GlobalAlarmServiceImpl;
import com.ruoyi.alarm.radar.service.ArdRadarService;
import com.ruoyi.common.utils.spring.SpringUtils;
@@ -73,8 +74,9 @@
// subscribe鍚庡緱鍒扮殑娑堟伅浼氭墽琛屽埌杩欓噷闈�
log.debug("鎺ユ敹娑堟伅 銆愪富棰樸��:" + topic + " 銆愬唴瀹广��:" + new String(message.getPayload(), StandardCharsets.UTF_8));
//杩涜涓氬姟澶勭悊(鎺ユ敹鎶ヨ鏁版嵁)
- GlobalAlarmServiceImpl globalAlarmService = SpringUtils.getBean(GlobalAlarmServiceImpl.class);
+ IGlobalAlarmService globalAlarmService = SpringUtils.getBean(IGlobalAlarmService.class);
globalAlarmService.receiveAlarm(topic, new String(message.getPayload(), StandardCharsets.UTF_8));
+
if (topic.equals("minioEvent"))
{
IStorageMinioEventService storageMinioEventService = SpringUtils.getBean(IStorageMinioEventService.class);
diff --git a/ard-work/src/main/java/com/ruoyi/utils/websocket/util/WebSocketUtils.java b/ard-work/src/main/java/com/ruoyi/utils/websocket/util/WebSocketUtils.java
index 4583f87..18606e1 100644
--- a/ard-work/src/main/java/com/ruoyi/utils/websocket/util/WebSocketUtils.java
+++ b/ard-work/src/main/java/com/ruoyi/utils/websocket/util/WebSocketUtils.java
@@ -23,7 +23,7 @@
public final class WebSocketUtils {
// 瀛樺偍 websocket session
- public static final Map<String, Session> ONLINE_USER_SESSIONS = new ConcurrentHashMap<>();
+ public static final ConcurrentMap<String, Session> ONLINE_USER_SESSIONS = new ConcurrentHashMap<>();
//瀛樺偍鎴块棿
public static final ConcurrentHashMap<String, Set<String>> ROOM_USER_SET = new ConcurrentHashMap<>();
--
Gitblit v1.9.3