package com.ruoyi.utils.mqtt;
|
|
import com.ruoyi.common.annotation.Log;
|
import com.ruoyi.common.utils.spring.SpringUtils;
|
import com.ruoyi.sy.domain.ArdSyCarLock;
|
import com.ruoyi.sy.domain.ArdSyCarRtu;
|
import com.ruoyi.sy.gps31.PositionContainer;
|
import com.ruoyi.sy.service.ArdSyCarLockService;
|
import com.ruoyi.sy.service.ArdSyCarRtuService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.eclipse.paho.client.mqttv3.*;
|
|
import java.nio.charset.StandardCharsets;
|
import java.util.HashMap;
|
import java.util.List;
|
import java.util.Map;
|
|
/**
|
* @Description: mqtt回调处理类
|
* @ClassName: MqttConsumerCallback
|
* @Author: 刘苏义
|
* @Date: 2023年05月29日9:55
|
* @Version: 1.0
|
**/
|
@Slf4j(topic = "mqtt")
|
public class MqttOnceCallback implements MqttCallbackExtended {
|
|
private MqttClient client;
|
private MqttConnectOptions options;
|
private String topic;
|
private int qos;
|
//注入电子锁Bean
|
ArdSyCarRtuService carRtuService = SpringUtils.getBean(ArdSyCarRtuService.class);
|
ArdSyCarLockService carLockService = SpringUtils.getBean(ArdSyCarLockService.class);
|
|
public MqttOnceCallback(MqttClient client, MqttConnectOptions options, String topic, int qos) {
|
this.client = client;
|
this.options = options;
|
this.topic = topic;
|
this.qos = qos;
|
}
|
|
/**
|
* 断开重连
|
*/
|
@Override
|
public void connectionLost(Throwable cause) {
|
// log.info("车辆电磁锁MQTT连接断开,发起重连......");
|
while (!client.isConnected()) {
|
try {
|
Thread.sleep(10000);
|
if (null != client && !client.isConnected()) {
|
client.reconnect();
|
// log.error("车辆电磁锁尝试重新连接");
|
} else {
|
client.connect(options);
|
// log.error("车辆电磁锁尝试建立新连接");
|
}
|
} catch (Exception e) {
|
// log.error("车辆电磁锁断开重连异常:" + e.getMessage());
|
}
|
}
|
}
|
|
/**
|
* 接收到消息调用令牌中调用
|
*/
|
@Override
|
public void deliveryComplete(IMqttDeliveryToken token) {
|
//log.info("deliveryComplete---------" + Arrays.toString(topic));
|
}
|
|
/**
|
* 消息处理
|
*/
|
@Override
|
public void messageArrived(String topic, MqttMessage message) {
|
try {
|
// System.out.println("【车辆锁主题】:"+topic+"【车辆锁信息】:"+new String(message.getPayload(), StandardCharsets.UTF_8));
|
//出来回调数据
|
List<ArdSyCarLock> ardSyCarLocks = carLockService.data(new String(message.getPayload(), StandardCharsets.UTF_8));
|
//根据topic查询RTUID
|
ArdSyCarRtu ardSyCarRtu = carRtuService.subscribeByCarId(topic);
|
String RTU = ardSyCarRtu.getId();
|
//根据RTUID查询锁
|
List<ArdSyCarLock> ardSyCarLockList = carLockService.carLock(RTU);
|
for (int i = 0; i < ardSyCarLocks.size(); i++) {
|
ArdSyCarLock ardSyCarLockData = ardSyCarLocks.get(i);
|
String relay = ardSyCarLockData.getRelay();
|
String relayInfo = ardSyCarLockData.getRelayInfo();
|
String current = ardSyCarLockData.getCurrents();
|
String currentInfo = ardSyCarLockData.getCurrentInfo();
|
for (int j = 0; j < ardSyCarLockList.size(); j++) {
|
ArdSyCarLock lock = ardSyCarLockList.get(j);
|
String relay1 = lock.getRelay();
|
String current1 = lock.getCurrents();
|
String relayInfo1 = lock.getRelayInfo();
|
String currentInfo1 = lock.getCurrentInfo();
|
Boolean upd = false;
|
if(relay.equals(relay1)){
|
if(!relayInfo.equals(relayInfo1)){
|
lock.setRelayInfo(relayInfo);
|
upd = true;
|
}
|
}
|
if(current.equals(current1)){
|
if(!currentInfo.equals(currentInfo1)){
|
lock.setCurrentInfo(currentInfo);
|
upd = true;
|
}
|
}
|
if(upd){
|
int num = carLockService.updLock(lock);
|
// log.debug("修改"+num+"数据");
|
}
|
}
|
}
|
// //存入集合
|
// Map<String,List<ArdSyCarLock>> map = PositionContainer.getTopicSubscribe();
|
// if(map==null){
|
// Map<String,List<ArdSyCarLock>> newMap = new HashMap<>();
|
// newMap.put("topic",ardSyCarLocks);
|
// PositionContainer.setTopicSubscribe(newMap);
|
// }else {
|
// for(Map.Entry entry : map.entrySet()){
|
// String mapTopic = (String) entry.getKey();
|
// if(topic.equals(mapTopic)){
|
// map.remove(entry.getKey());
|
// }
|
// Map<String,List<ArdSyCarLock>> newMap = new HashMap<>();
|
// newMap.put("topic",ardSyCarLocks);
|
// PositionContainer.setTopicSubscribe(newMap);
|
// }
|
// }
|
// log.debug(String.valueOf(map));
|
} catch (Exception e) {
|
log.debug("车辆电磁锁处理mqtt消息异常:" + e);
|
}
|
}
|
|
/**
|
* mqtt连接后订阅主题
|
*/
|
@Override
|
public void connectComplete(boolean b, String s) {
|
try {
|
if (null != topic) {
|
if (client.isConnected()) {
|
client.subscribe(topic, qos);
|
// log.info("车辆电磁锁mqtt连接成功" );
|
// log.info("--车辆电磁锁订阅主题::" + topic);
|
} else {
|
log.info("车辆电磁锁mqtt连接失败");
|
}
|
}
|
} catch (Exception e) {
|
log.info("车辆电磁锁mqtt订阅主题异常:" + e);
|
}
|
}
|
}
|