| package com.ruoyi.alarm.global.service.impl; | 
|   | 
| import com.ruoyi.alarm.global.domain.GuidePriorityQueue; | 
| import com.ruoyi.alarm.global.domain.GuideTask; | 
| import com.ruoyi.common.utils.StringUtils; | 
| import lombok.extern.slf4j.Slf4j; | 
| import org.springframework.beans.factory.annotation.Autowired; | 
| import org.springframework.stereotype.Component; | 
|   | 
| import java.util.Map; | 
| import java.util.concurrent.ConcurrentHashMap; | 
| import java.util.concurrent.PriorityBlockingQueue; | 
|   | 
| /** | 
|  * @ClassName QueueManager | 
|  * @Description: 队列管理 | 
|  * @Author 刘苏义 | 
|  * @Date 2023/6/29 21:09 | 
|  * @Version 1.0 | 
|  */ | 
|   | 
| @Component | 
| @Slf4j(topic = "guideQueue") | 
| public class QueueManager { | 
|   | 
|     @Autowired | 
|     private QueueTaskExecutor taskExecutor; | 
|     private Map<String, TaskThread> threadMap = new ConcurrentHashMap<>(); | 
|   | 
|     /*内部静态类*/ | 
|     private static class TaskThread { | 
|         private Thread thread; | 
|         private GuideTask currentTask; | 
|   | 
|         public TaskThread(Thread thread, GuideTask currentTask) { | 
|             this.thread = thread; | 
|             this.currentTask = currentTask; | 
|         } | 
|   | 
|         public Thread getThread() { | 
|             return thread; | 
|         } | 
|   | 
|         public GuideTask getCurrentTask() { | 
|             return currentTask; | 
|         } | 
|   | 
|         public void setCurrentTask(GuideTask task) { | 
|             this.currentTask = task; | 
|         } | 
|     } | 
|   | 
|     /** | 
|      * 引导任务入队 | 
|      * 刘苏义 | 
|      * 2023/6/30 8:57 | 
|      */ | 
|     public void addTaskToQueue(String cameraId, GuideTask task) { | 
|         /*通过相机ID获取引导队列,并将引导任务加入队列*/ | 
|         PriorityBlockingQueue<GuideTask> guideTaskQueue = GuidePriorityQueue.cameraQueueMap.get(cameraId); | 
|         if (StringUtils.isNull(guideTaskQueue)) { | 
|             log.info("相机未登录,没有队列,无法入队引导"); | 
|             return; | 
|         } | 
|         log.debug("新任务入队:" + task.getAlarmId()); | 
|         guideTaskQueue.add(task); | 
|         /*获取该相机的当前执行线程*/ | 
|         TaskThread currentTaskThread = threadMap.get(cameraId); | 
|         //如果队列当前线程正在运行,若入队任务优先级大于当前任务优先级,则终止当前线程 | 
|         if (currentTaskThread != null){ | 
|             if (task.getPriority() > currentTaskThread.getCurrentTask().getPriority()) { | 
|                 currentTaskThread.getThread().interrupt(); | 
|             } | 
|             if (!currentTaskThread.getThread().isAlive()) { | 
|                 Thread newThread = createThread(cameraId, guideTaskQueue); | 
|                 threadMap.put(cameraId, new TaskThread(newThread, task)); | 
|                 newThread.start(); | 
|             } | 
|         } | 
|         //如果队列当前没有线程正在运行,则启动新线程 | 
|         else { | 
|             Thread newThread = createThread(cameraId, guideTaskQueue); | 
|             threadMap.put(cameraId, new TaskThread(newThread, task)); | 
|             newThread.start(); | 
|         } | 
|     } | 
|   | 
|     /** | 
|      * 创建线程 | 
|      * 刘苏义 | 
|      * 2023/6/30 9:04 | 
|      */ | 
|     private Thread createThread(String queueName, PriorityBlockingQueue<GuideTask> queue) { | 
|   | 
|         return new Thread(() -> { | 
|             while (!Thread.currentThread().isInterrupted()) { | 
|                 try { | 
|                     GuideTask task = queue.take(); | 
|                     log.debug("取出队列数据:" + task.getAlarmId()); | 
|                     taskExecutor.processTask(task); | 
|                     // 更新线程的当前任务 | 
|                     TaskThread currentTaskThread = threadMap.get(queueName); | 
|                     if (currentTaskThread != null) { | 
|                         currentTaskThread.setCurrentTask(task); | 
|                     } | 
|                 } catch (InterruptedException e) { | 
|                     log.info("中断当前线程"); | 
|                     //线程中断,退出循环 | 
|                     Thread.currentThread().interrupt(); | 
|                 } | 
|             } | 
|         }, queueName); | 
|     } | 
| } |