package com.ruoyi.alarm.global.service.impl; import com.ruoyi.alarm.global.domain.GuidePriorityQueue; import com.ruoyi.alarm.global.domain.GuideTask; import com.ruoyi.common.utils.StringUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; /** * @ClassName QueueManager * @Description: 队列管理 * @Author 刘苏义 * @Date 2023/6/29 21:09 * @Version 1.0 */ @Component @Slf4j(topic = "guideQueue") public class QueueManager { @Autowired private QueueTaskExecutor taskExecutor; private Map threadMap = new ConcurrentHashMap<>(); /*内部静态类*/ private static class TaskThread { private Thread thread; private GuideTask currentTask; public TaskThread(Thread thread, GuideTask currentTask) { this.thread = thread; this.currentTask = currentTask; } public Thread getThread() { return thread; } public GuideTask getCurrentTask() { return currentTask; } public void setCurrentTask(GuideTask task) { this.currentTask = task; } } /** * 引导任务入队 * 刘苏义 * 2023/6/30 8:57 */ public void addTaskToQueue(String cameraId, GuideTask task) { /*通过相机ID获取引导队列,并将引导任务加入队列*/ PriorityBlockingQueue guideTaskQueue = GuidePriorityQueue.cameraQueueMap.get(cameraId); if (StringUtils.isNull(guideTaskQueue)) { log.info("相机未登录,没有队列,无法入队引导"); return; } log.debug("新任务入队:" + task.getAlarmId()); guideTaskQueue.add(task); /*获取该相机的当前执行线程*/ TaskThread currentTaskThread = threadMap.get(cameraId); //如果队列当前线程正在运行,若入队任务优先级大于当前任务优先级,则终止当前线程 if (currentTaskThread != null){ if (task.getPriority() > currentTaskThread.getCurrentTask().getPriority()) { currentTaskThread.getThread().interrupt(); } if (!currentTaskThread.getThread().isAlive()) { Thread newThread = createThread(cameraId, guideTaskQueue); threadMap.put(cameraId, new TaskThread(newThread, task)); newThread.start(); } } //如果队列当前没有线程正在运行,则启动新线程 else { Thread newThread = createThread(cameraId, guideTaskQueue); threadMap.put(cameraId, new TaskThread(newThread, task)); newThread.start(); } } /** * 创建线程 * 刘苏义 * 2023/6/30 9:04 */ private Thread createThread(String queueName, PriorityBlockingQueue queue) { return new Thread(() -> { while (!Thread.currentThread().isInterrupted()) { try { GuideTask task = queue.take(); log.debug("取出队列数据:" + task.getAlarmId()); taskExecutor.processTask(task); // 更新线程的当前任务 TaskThread currentTaskThread = threadMap.get(queueName); if (currentTaskThread != null) { currentTaskThread.setCurrentTask(task); } } catch (InterruptedException e) { log.info("中断当前线程"); //线程中断,退出循环 Thread.currentThread().interrupt(); } } }, queueName); } }