package com.ruoyi.alarm.globalAlarm.service.impl;
|
|
import com.ruoyi.alarm.globalAlarm.domain.GuidePriorityQueue;
|
import com.ruoyi.alarm.globalAlarm.domain.GuideTask;
|
import org.apache.tomcat.util.threads.TaskThread;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
import org.springframework.stereotype.Component;
|
|
import java.util.Map;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.PriorityBlockingQueue;
|
|
/**
|
* @ClassName QueueManager
|
* @Description: 队列管理
|
* @Author 刘苏义
|
* @Date 2023/6/29 21:09
|
* @Version 1.0
|
*/
|
|
@Component
|
public class QueueManager {
|
|
@Autowired
|
private QueueTaskExecutor taskExecutor;
|
private Map<String, TaskThread> threadMap = new ConcurrentHashMap<>();
|
|
/*内部静态类*/
|
private static class TaskThread {
|
private Thread thread;
|
private GuideTask currentTask;
|
|
public TaskThread(Thread thread, GuideTask currentTask) {
|
this.thread = thread;
|
this.currentTask = currentTask;
|
}
|
|
public Thread getThread() {
|
return thread;
|
}
|
|
public GuideTask getCurrentTask() {
|
return currentTask;
|
}
|
|
public void setCurrentTask(GuideTask task) {
|
this.currentTask = task;
|
}
|
}
|
|
/**
|
* 引导任务入队
|
* 刘苏义
|
* 2023/6/30 8:57
|
*/
|
public void addTaskToQueue(String cameraId, GuideTask task) {
|
/*通过相机ID获取引导队列,并将引导任务加入队列*/
|
PriorityBlockingQueue<GuideTask> guideTaskQueue = GuidePriorityQueue.cameraQueueMap.get(cameraId);
|
guideTaskQueue.add(task);
|
/*获取该相机的当前执行线程*/
|
TaskThread currentTaskThread = threadMap.get(cameraId);
|
//如果队列当前线程正在运行,若入队任务优先级大于当前任务优先级,则终止当前线程
|
if (currentTaskThread != null && task.getPriority() > currentTaskThread.getCurrentTask().getPriority()) {
|
currentTaskThread.getThread().interrupt();
|
}
|
//如果队列当前没有线程正在运行,则启动新线程
|
if (currentTaskThread == null || !currentTaskThread.getThread().isAlive()) {
|
Thread newThread = createThread(cameraId, guideTaskQueue);
|
threadMap.put(cameraId, new TaskThread(newThread, task));
|
newThread.start();
|
}
|
}
|
|
/**
|
* 创建线程
|
* 刘苏义
|
* 2023/6/30 9:04
|
*/
|
private Thread createThread(String queueName, PriorityBlockingQueue<GuideTask> queue) {
|
|
return new Thread(() -> {
|
while (!Thread.currentThread().isInterrupted()) {
|
try {
|
GuideTask task = queue.take();
|
taskExecutor.processTask(task);
|
GuidePriorityQueue.printPriorityQueue();
|
// 更新线程的当前任务
|
TaskThread currentTaskThread = threadMap.get(queueName);
|
if (currentTaskThread != null) {
|
currentTaskThread.setCurrentTask(task);
|
}
|
} catch (InterruptedException e) {
|
//线程中断,退出循环
|
Thread.currentThread().interrupt();
|
}
|
}
|
}, queueName);
|
}
|
}
|