liusuyi
2023-06-29 3daa378637a5ceeb4965b554f528a7a36e3d5c2a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package com.ruoyi.alarm.globalAlarm.service.impl;
 
import com.ruoyi.alarm.globalAlarm.domain.GuidePriorityQueue;
import com.ruoyi.alarm.globalAlarm.domain.GuideTask;
import org.apache.tomcat.util.threads.TaskThread;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
 
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
 
/**
 * @ClassName QueueManager
 * @Description:
 * @Author 刘苏义
 * @Date 2023/6/29 21:09
 * @Version 1.0
 */
 
@Component
public class QueueManager {
 
    @Autowired
    private QueueTaskExecutor taskExecutor;
    private Map<String, TaskThread> threadMap = new ConcurrentHashMap<>();
 
    private static class TaskThread {
        private Thread thread;
        private GuideTask currentTask;
 
        public TaskThread(Thread thread, GuideTask currentTask) {
            this.thread = thread;
            this.currentTask = currentTask;
        }
 
        public Thread getThread() {
            return thread;
        }
 
        public GuideTask getCurrentTask() {
            return currentTask;
        }
 
        public void setCurrentTask(GuideTask task) {
            this.currentTask = task;
        }
    }
 
    public void addTaskToQueue(String queueName, GuideTask task) {
        PriorityBlockingQueue<GuideTask> guideTaskQueue = GuidePriorityQueue.cameraQueueMap.get(queueName);
        guideTaskQueue.add(task);
        TaskThread currentTaskThread = threadMap.get(queueName);
        if (currentTaskThread != null && task.getPriority() > currentTaskThread.getCurrentTask().getPriority()) {
            currentTaskThread.getThread().interrupt();
        }
 
        // Start a new thread if no thread is currently running for the queue
        if (currentTaskThread == null || !currentTaskThread.getThread().isAlive()) {
            Thread newThread = createThread(queueName, guideTaskQueue);
            threadMap.put(queueName, new TaskThread(newThread, task));
            newThread.start();
        }
    }
 
    private Thread createThread(String queueName, PriorityBlockingQueue<GuideTask> queue) {
        return new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    GuideTask task = queue.take();
                    taskExecutor.processTask(task);
                    GuidePriorityQueue.printPriorityQueue();
                    // Update the current task for the thread
                    TaskThread currentTaskThread = threadMap.get(queueName);
                    if (currentTaskThread != null) {
                        currentTaskThread.setCurrentTask(task);
                    }
                } catch (InterruptedException e) {
                    // Thread interrupted, exit the loop
                    Thread.currentThread().interrupt();
                }
            }
        }, queueName);
    }
}