aijinhui
2023-11-13 f4e20b2dc544a188d0591c901022383e84fa8e9e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package com.ruoyi.utils.mqtt;
 
import com.ruoyi.common.annotation.Log;
import com.ruoyi.common.utils.spring.SpringUtils;
import com.ruoyi.sy.domain.ArdSyCarLock;
import com.ruoyi.sy.domain.ArdSyCarRtu;
import com.ruoyi.sy.gps31.PositionContainer;
import com.ruoyi.sy.service.ArdSyCarLockService;
import com.ruoyi.sy.service.ArdSyCarRtuService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
 
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
 
/**
 * @Description: mqtt回调处理类
 * @ClassName: MqttConsumerCallback
 * @Author: 刘苏义
 * @Date: 2023年05月29日9:55
 * @Version: 1.0
 **/
@Slf4j(topic = "mqtt")
public class MqttOnceCallback implements MqttCallbackExtended {
 
    private MqttClient client;
    private MqttConnectOptions options;
    private String topic;
    private int qos;
    //注入电子锁Bean
    ArdSyCarRtuService carRtuService = SpringUtils.getBean(ArdSyCarRtuService.class);
    ArdSyCarLockService carLockService = SpringUtils.getBean(ArdSyCarLockService.class);
 
    public MqttOnceCallback(MqttClient client, MqttConnectOptions options, String topic, int qos) {
        this.client = client;
        this.options = options;
        this.topic = topic;
        this.qos = qos;
    }
 
    /**
     * 断开重连
     */
    @Override
    public void connectionLost(Throwable cause) {
//        log.info("车辆电磁锁MQTT连接断开,发起重连......");
        while (!client.isConnected()) {
            try {
                Thread.sleep(10000);
                if (null != client && !client.isConnected()) {
                    client.reconnect();
//                    log.error("车辆电磁锁尝试重新连接");
                } else {
                    client.connect(options);
//                    log.error("车辆电磁锁尝试建立新连接");
                }
            } catch (Exception e) {
//                log.error("车辆电磁锁断开重连异常:" + e.getMessage());
            }
        }
    }
 
    /**
     * 接收到消息调用令牌中调用
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        //log.info("deliveryComplete---------" + Arrays.toString(topic));
    }
 
    /**
     * 消息处理
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) {
        try {
//            System.out.println("【车辆锁主题】:"+topic+"【车辆锁信息】:"+new String(message.getPayload(), StandardCharsets.UTF_8));
            //出来回调数据
            List<ArdSyCarLock> ardSyCarLocks = carLockService.data(new String(message.getPayload(), StandardCharsets.UTF_8));
            //根据topic查询RTUID
            ArdSyCarRtu ardSyCarRtu = carRtuService.subscribeByCarId(topic);
            String RTU = ardSyCarRtu.getId();
            //根据RTUID查询锁
            List<ArdSyCarLock> ardSyCarLockList = carLockService.carLock(RTU);
            for (int i = 0; i < ardSyCarLocks.size(); i++) {
                ArdSyCarLock ardSyCarLockData = ardSyCarLocks.get(i);
                String relay = ardSyCarLockData.getRelay();
                String relayInfo = ardSyCarLockData.getRelayInfo();
                String current = ardSyCarLockData.getCurrents();
                String currentInfo = ardSyCarLockData.getCurrentInfo();
                for (int j = 0; j < ardSyCarLockList.size(); j++) {
                    ArdSyCarLock lock = ardSyCarLockList.get(j);
                    String relay1 = lock.getRelay();
                    String current1 = lock.getCurrents();
                    String relayInfo1 = lock.getRelayInfo();
                    String currentInfo1 = lock.getCurrentInfo();
                    Boolean upd = false;
                    if(relay.equals(relay1)){
                        if(!relayInfo.equals(relayInfo1)){
                            lock.setRelayInfo(relayInfo);
                            upd = true;
                        }
                    }
                    if(current.equals(current1)){
                        if(!currentInfo.equals(currentInfo1)){
                            lock.setCurrentInfo(currentInfo);
                            upd = true;
                        }
                    }
                    if(upd){
                        int num = carLockService.updLock(lock);
//                        log.debug("修改"+num+"数据");
                    }
                }
            }
//            //存入集合
//            Map<String,List<ArdSyCarLock>> map = PositionContainer.getTopicSubscribe();
//            if(map==null){
//                Map<String,List<ArdSyCarLock>> newMap = new HashMap<>();
//                newMap.put("topic",ardSyCarLocks);
//                PositionContainer.setTopicSubscribe(newMap);
//            }else {
//                for(Map.Entry entry : map.entrySet()){
//                    String mapTopic = (String) entry.getKey();
//                    if(topic.equals(mapTopic)){
//                        map.remove(entry.getKey());
//                    }
//                    Map<String,List<ArdSyCarLock>> newMap = new HashMap<>();
//                    newMap.put("topic",ardSyCarLocks);
//                    PositionContainer.setTopicSubscribe(newMap);
//                }
//            }
//            log.debug(String.valueOf(map));
        } catch (Exception e) {
            log.debug("车辆电磁锁处理mqtt消息异常:" + e);
        }
    }
 
    /**
     * mqtt连接后订阅主题
     */
    @Override
    public void connectComplete(boolean b, String s) {
        try {
            if (null != topic) {
                if (client.isConnected()) {
                    client.subscribe(topic, qos);
//                    log.info("车辆电磁锁mqtt连接成功" );
//                    log.info("--车辆电磁锁订阅主题::" + topic);
                } else {
                    log.info("车辆电磁锁mqtt连接失败");
                }
            }
        } catch (Exception e) {
            log.info("车辆电磁锁mqtt订阅主题异常:" + e);
        }
    }
}