package com.ruoyi.utils.mqtt; import com.ruoyi.common.annotation.Log; import com.ruoyi.common.utils.spring.SpringUtils; import com.ruoyi.sy.domain.ArdSyCarLock; import com.ruoyi.sy.domain.ArdSyCarRtu; import com.ruoyi.sy.gps31.PositionContainer; import com.ruoyi.sy.service.ArdSyCarLockService; import com.ruoyi.sy.service.ArdSyCarRtuService; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.List; import java.util.Map; /** * @Description: mqtt回调处理类 * @ClassName: MqttConsumerCallback * @Author: 刘苏义 * @Date: 2023年05月29日9:55 * @Version: 1.0 **/ @Slf4j(topic = "mqtt") public class MqttOnceCallback implements MqttCallbackExtended { private MqttClient client; private MqttConnectOptions options; private String topic; private int qos; //注入电子锁Bean ArdSyCarRtuService carRtuService = SpringUtils.getBean(ArdSyCarRtuService.class); ArdSyCarLockService carLockService = SpringUtils.getBean(ArdSyCarLockService.class); public MqttOnceCallback(MqttClient client, MqttConnectOptions options, String topic, int qos) { this.client = client; this.options = options; this.topic = topic; this.qos = qos; } /** * 断开重连 */ @Override public void connectionLost(Throwable cause) { // log.info("车辆电磁锁MQTT连接断开,发起重连......"); while (!client.isConnected()) { try { Thread.sleep(10000); if (null != client && !client.isConnected()) { client.reconnect(); // log.error("车辆电磁锁尝试重新连接"); } else { client.connect(options); // log.error("车辆电磁锁尝试建立新连接"); } } catch (Exception e) { // log.error("车辆电磁锁断开重连异常:" + e.getMessage()); } } } /** * 接收到消息调用令牌中调用 */ @Override public void deliveryComplete(IMqttDeliveryToken token) { //log.info("deliveryComplete---------" + Arrays.toString(topic)); } /** * 消息处理 */ @Override public void messageArrived(String topic, MqttMessage message) { try { // System.out.println("【车辆锁主题】:"+topic+"【车辆锁信息】:"+new String(message.getPayload(), StandardCharsets.UTF_8)); //出来回调数据 List ardSyCarLocks = carLockService.data(new String(message.getPayload(), StandardCharsets.UTF_8)); //根据topic查询RTUID ArdSyCarRtu ardSyCarRtu = carRtuService.subscribeByCarId(topic); String RTU = ardSyCarRtu.getId(); //根据RTUID查询锁 List ardSyCarLockList = carLockService.carLock(RTU); for (int i = 0; i < ardSyCarLocks.size(); i++) { ArdSyCarLock ardSyCarLockData = ardSyCarLocks.get(i); String relay = ardSyCarLockData.getRelay(); Integer relayInfo = ardSyCarLockData.getRelayInfo(); String current = ardSyCarLockData.getCurrents(); Double currentInfo = ardSyCarLockData.getCurrentInfo(); for (int j = 0; j < ardSyCarLockList.size(); j++) { ArdSyCarLock lock = ardSyCarLockList.get(j); String relay1 = lock.getRelay(); String current1 = lock.getCurrents(); Integer relayInfo1 = lock.getRelayInfo(); Double currentInfo1 = lock.getCurrentInfo(); Boolean upd = false; if(relay.equals(relay1)){ if(!relayInfo.equals(relayInfo1)){ lock.setRelayInfo(relayInfo); upd = true; } } if(current.equals(current1)){ if(!currentInfo.equals(currentInfo1)){ lock.setCurrentInfo(currentInfo); upd = true; } } if(upd){ int num = carLockService.updLock(lock); // log.debug("修改"+num+"数据"); } } } // //存入集合 // Map> map = PositionContainer.getTopicSubscribe(); // if(map==null){ // Map> newMap = new HashMap<>(); // newMap.put("topic",ardSyCarLocks); // PositionContainer.setTopicSubscribe(newMap); // }else { // for(Map.Entry entry : map.entrySet()){ // String mapTopic = (String) entry.getKey(); // if(topic.equals(mapTopic)){ // map.remove(entry.getKey()); // } // Map> newMap = new HashMap<>(); // newMap.put("topic",ardSyCarLocks); // PositionContainer.setTopicSubscribe(newMap); // } // } // log.debug(String.valueOf(map)); } catch (Exception e) { log.debug("车辆电磁锁处理mqtt消息异常:" + e); } } /** * mqtt连接后订阅主题 */ @Override public void connectComplete(boolean b, String s) { try { if (null != topic) { if (client.isConnected()) { client.subscribe(topic, qos); // log.info("车辆电磁锁mqtt连接成功" ); // log.info("--车辆电磁锁订阅主题::" + topic); } else { log.info("车辆电磁锁mqtt连接失败"); } } } catch (Exception e) { log.info("车辆电磁锁mqtt订阅主题异常:" + e); } } }