package com.ruoyi.alarm.globalAlarm.service.impl; import com.ruoyi.alarm.globalAlarm.domain.GuidePriorityQueue; import com.ruoyi.alarm.globalAlarm.domain.GuideTask; import org.apache.tomcat.util.threads.TaskThread; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; /** * @ClassName QueueManager * @Description: 队列管理 * @Author 刘苏义 * @Date 2023/6/29 21:09 * @Version 1.0 */ @Component public class QueueManager { @Autowired private QueueTaskExecutor taskExecutor; private Map threadMap = new ConcurrentHashMap<>(); /*内部静态类*/ private static class TaskThread { private Thread thread; private GuideTask currentTask; public TaskThread(Thread thread, GuideTask currentTask) { this.thread = thread; this.currentTask = currentTask; } public Thread getThread() { return thread; } public GuideTask getCurrentTask() { return currentTask; } public void setCurrentTask(GuideTask task) { this.currentTask = task; } } /** * 引导任务入队 * 刘苏义 * 2023/6/30 8:57 */ public void addTaskToQueue(String cameraId, GuideTask task) { /*通过相机ID获取引导队列,并将引导任务加入队列*/ PriorityBlockingQueue guideTaskQueue = GuidePriorityQueue.cameraQueueMap.get(cameraId); guideTaskQueue.add(task); /*获取该相机的当前执行线程*/ TaskThread currentTaskThread = threadMap.get(cameraId); //如果队列当前线程正在运行,若入队任务优先级大于当前任务优先级,则终止当前线程 if (currentTaskThread != null && task.getPriority() > currentTaskThread.getCurrentTask().getPriority()) { currentTaskThread.getThread().interrupt(); } //如果队列当前没有线程正在运行,则启动新线程 if (currentTaskThread == null || !currentTaskThread.getThread().isAlive()) { Thread newThread = createThread(cameraId, guideTaskQueue); threadMap.put(cameraId, new TaskThread(newThread, task)); newThread.start(); } } /** * 创建线程 * 刘苏义 * 2023/6/30 9:04 */ private Thread createThread(String queueName, PriorityBlockingQueue queue) { return new Thread(() -> { while (!Thread.currentThread().isInterrupted()) { try { GuideTask task = queue.take(); taskExecutor.processTask(task); GuidePriorityQueue.printPriorityQueue(); // 更新线程的当前任务 TaskThread currentTaskThread = threadMap.get(queueName); if (currentTaskThread != null) { currentTaskThread.setCurrentTask(task); } } catch (InterruptedException e) { //线程中断,退出循环 Thread.currentThread().interrupt(); } } }, queueName); } }