| | |
| | | |
| | | /** |
| | | * @ClassName QueueManager |
| | | * @Description: |
| | | * @Description: 队列管理 |
| | | * @Author 刘苏义 |
| | | * @Date 2023/6/29 21:09 |
| | | * @Version 1.0 |
| | |
| | | private QueueTaskExecutor taskExecutor; |
| | | private Map<String, TaskThread> threadMap = new ConcurrentHashMap<>(); |
| | | |
| | | /*内部静态类*/ |
| | | private static class TaskThread { |
| | | private Thread thread; |
| | | private GuideTask currentTask; |
| | |
| | | } |
| | | } |
| | | |
| | | public void addTaskToQueue(String queueName, GuideTask task) { |
| | | PriorityBlockingQueue<GuideTask> guideTaskQueue = GuidePriorityQueue.cameraQueueMap.get(queueName); |
| | | /** |
| | | * 引导任务入队 |
| | | * 刘苏义 |
| | | * 2023/6/30 8:57 |
| | | */ |
| | | public void addTaskToQueue(String cameraId, GuideTask task) { |
| | | /*通过相机ID获取引导队列,并将引导任务加入队列*/ |
| | | PriorityBlockingQueue<GuideTask> guideTaskQueue = GuidePriorityQueue.cameraQueueMap.get(cameraId); |
| | | guideTaskQueue.add(task); |
| | | TaskThread currentTaskThread = threadMap.get(queueName); |
| | | /*获取该相机的当前执行线程*/ |
| | | TaskThread currentTaskThread = threadMap.get(cameraId); |
| | | //如果队列当前线程正在运行,若入队任务优先级大于当前任务优先级,则终止当前线程 |
| | | if (currentTaskThread != null && task.getPriority() > currentTaskThread.getCurrentTask().getPriority()) { |
| | | currentTaskThread.getThread().interrupt(); |
| | | } |
| | | |
| | | // Start a new thread if no thread is currently running for the queue |
| | | //如果队列当前没有线程正在运行,则启动新线程 |
| | | if (currentTaskThread == null || !currentTaskThread.getThread().isAlive()) { |
| | | Thread newThread = createThread(queueName, guideTaskQueue); |
| | | threadMap.put(queueName, new TaskThread(newThread, task)); |
| | | Thread newThread = createThread(cameraId, guideTaskQueue); |
| | | threadMap.put(cameraId, new TaskThread(newThread, task)); |
| | | newThread.start(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 创建线程 |
| | | * 刘苏义 |
| | | * 2023/6/30 9:04 |
| | | */ |
| | | private Thread createThread(String queueName, PriorityBlockingQueue<GuideTask> queue) { |
| | | |
| | | return new Thread(() -> { |
| | | while (!Thread.currentThread().isInterrupted()) { |
| | | try { |
| | | GuideTask task = queue.take(); |
| | | taskExecutor.processTask(task); |
| | | GuidePriorityQueue.printPriorityQueue(); |
| | | // Update the current task for the thread |
| | | // 更新线程的当前任务 |
| | | TaskThread currentTaskThread = threadMap.get(queueName); |
| | | if (currentTaskThread != null) { |
| | | currentTaskThread.setCurrentTask(task); |
| | | } |
| | | } catch (InterruptedException e) { |
| | | // Thread interrupted, exit the loop |
| | | //线程中断,退出循环 |
| | | Thread.currentThread().interrupt(); |
| | | } |
| | | } |