From 60211f59d2c85053533ed151adb2bdc5348dd342 Mon Sep 17 00:00:00 2001 From: liusuyi <1951119284@qq.com> Date: 星期四, 01 八月 2024 09:51:12 +0800 Subject: [PATCH] 修改:会话消息独立线程池;线程池参数统一设置 --- ard-work/src/main/java/com/ruoyi/utils/websocket/util/WebSocketUtils.java | 2 +- ard-work/src/main/java/com/ruoyi/alarm/config/AsyncConfiguration.java | 28 ++++++++++++++++++++++++---- ard-work/src/main/java/com/ruoyi/call/listener/MsgListener.java | 2 +- ard-work/src/main/java/com/ruoyi/call/service/impl/ArdCallHistoryServiceImpl.java | 2 ++ ard-work/src/main/java/com/ruoyi/utils/mqtt/MqttConsumerCallback.java | 4 +++- 5 files changed, 31 insertions(+), 7 deletions(-) diff --git a/ard-work/src/main/java/com/ruoyi/alarm/config/AsyncConfiguration.java b/ard-work/src/main/java/com/ruoyi/alarm/config/AsyncConfiguration.java index 7ce10af..95838d8 100644 --- a/ard-work/src/main/java/com/ruoyi/alarm/config/AsyncConfiguration.java +++ b/ard-work/src/main/java/com/ruoyi/alarm/config/AsyncConfiguration.java @@ -76,11 +76,11 @@ public Executor globalExecutor(){ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //閰嶇疆鏍稿績绾跨▼鏁� - executor.setCorePoolSize(10); + executor.setCorePoolSize(corePoolSize); //閰嶇疆鏈�澶х嚎绋嬫暟 - executor.setMaxPoolSize(15); + executor.setMaxPoolSize(maxPoolSize); //閰嶇疆闃熷垪澶у皬 - executor.setQueueCapacity(25); + executor.setQueueCapacity(queueCapacity); //绾跨▼鐨勫悕绉板墠缂� executor.setThreadNamePrefix("globalExecutor-"); //绾跨▼娲昏穬鏃堕棿锛堢锛� @@ -93,5 +93,25 @@ executor.initialize(); return executor; } - + @Bean("msgExecutor") + public Executor msgExecutor(){ + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + //閰嶇疆鏍稿績绾跨▼鏁� + executor.setCorePoolSize(corePoolSize); + //閰嶇疆鏈�澶х嚎绋嬫暟 + executor.setMaxPoolSize(maxPoolSize); + //閰嶇疆闃熷垪澶у皬 + executor.setQueueCapacity(queueCapacity); + //绾跨▼鐨勫悕绉板墠缂� + executor.setThreadNamePrefix("msgExecutor-"); + //绾跨▼娲昏穬鏃堕棿锛堢锛� + executor.setKeepAliveSeconds(keepAliveSeconds); + //绛夊緟鎵�鏈変换鍔$粨鏉熷悗鍐嶅叧闂嚎绋嬫睜 + executor.setWaitForTasksToCompleteOnShutdown(true); + //璁剧疆鎷掔粷绛栫暐 + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + //鎵ц鍒濆鍖� + executor.initialize(); + return executor; + } } diff --git a/ard-work/src/main/java/com/ruoyi/call/listener/MsgListener.java b/ard-work/src/main/java/com/ruoyi/call/listener/MsgListener.java index 552c9d2..ebbecfa 100644 --- a/ard-work/src/main/java/com/ruoyi/call/listener/MsgListener.java +++ b/ard-work/src/main/java/com/ruoyi/call/listener/MsgListener.java @@ -26,7 +26,7 @@ @Slf4j(topic = "msgListener") public class MsgListener { - @Async("globalExecutor") + @Async("msgExecutor") @EventListener(MessageEvent.class) public void ArdCallHistoryEventListener(MessageEvent messageEvent) { log.debug("鐩戝惉鍒颁細璇濇秷鎭簨浠�:"+messageEvent.getArdCallHistory().getContent()); diff --git a/ard-work/src/main/java/com/ruoyi/call/service/impl/ArdCallHistoryServiceImpl.java b/ard-work/src/main/java/com/ruoyi/call/service/impl/ArdCallHistoryServiceImpl.java index 979a542..b28254c 100644 --- a/ard-work/src/main/java/com/ruoyi/call/service/impl/ArdCallHistoryServiceImpl.java +++ b/ard-work/src/main/java/com/ruoyi/call/service/impl/ArdCallHistoryServiceImpl.java @@ -16,6 +16,7 @@ import com.ruoyi.common.utils.uuid.IdUtils; import com.ruoyi.system.service.ISysUserService; import com.ruoyi.utils.websocket.util.WebSocketUtils; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Service; @@ -42,6 +43,7 @@ * @date 2024-07-03 */ @Service +@Slf4j(topic = "msgListener") public class ArdCallHistoryServiceImpl implements IArdCallHistoryService { @Resource private ArdCallHistoryMapper ardCallHistoryMapper; diff --git a/ard-work/src/main/java/com/ruoyi/utils/mqtt/MqttConsumerCallback.java b/ard-work/src/main/java/com/ruoyi/utils/mqtt/MqttConsumerCallback.java index f89b53d..566696f 100644 --- a/ard-work/src/main/java/com/ruoyi/utils/mqtt/MqttConsumerCallback.java +++ b/ard-work/src/main/java/com/ruoyi/utils/mqtt/MqttConsumerCallback.java @@ -1,5 +1,6 @@ package com.ruoyi.utils.mqtt; +import com.ruoyi.alarm.global.service.IGlobalAlarmService; import com.ruoyi.alarm.global.service.impl.GlobalAlarmServiceImpl; import com.ruoyi.alarm.radar.service.ArdRadarService; import com.ruoyi.common.utils.spring.SpringUtils; @@ -73,8 +74,9 @@ // subscribe鍚庡緱鍒扮殑娑堟伅浼氭墽琛屽埌杩欓噷闈� log.debug("鎺ユ敹娑堟伅 銆愪富棰樸��:" + topic + " 銆愬唴瀹广��:" + new String(message.getPayload(), StandardCharsets.UTF_8)); //杩涜涓氬姟澶勭悊(鎺ユ敹鎶ヨ鏁版嵁) - GlobalAlarmServiceImpl globalAlarmService = SpringUtils.getBean(GlobalAlarmServiceImpl.class); + IGlobalAlarmService globalAlarmService = SpringUtils.getBean(IGlobalAlarmService.class); globalAlarmService.receiveAlarm(topic, new String(message.getPayload(), StandardCharsets.UTF_8)); + if (topic.equals("minioEvent")) { IStorageMinioEventService storageMinioEventService = SpringUtils.getBean(IStorageMinioEventService.class); diff --git a/ard-work/src/main/java/com/ruoyi/utils/websocket/util/WebSocketUtils.java b/ard-work/src/main/java/com/ruoyi/utils/websocket/util/WebSocketUtils.java index 4583f87..18606e1 100644 --- a/ard-work/src/main/java/com/ruoyi/utils/websocket/util/WebSocketUtils.java +++ b/ard-work/src/main/java/com/ruoyi/utils/websocket/util/WebSocketUtils.java @@ -23,7 +23,7 @@ public final class WebSocketUtils { // 瀛樺偍 websocket session - public static final Map<String, Session> ONLINE_USER_SESSIONS = new ConcurrentHashMap<>(); + public static final ConcurrentMap<String, Session> ONLINE_USER_SESSIONS = new ConcurrentHashMap<>(); //瀛樺偍鎴块棿 public static final ConcurrentHashMap<String, Set<String>> ROOM_USER_SET = new ConcurrentHashMap<>(); -- Gitblit v1.9.3