package com.ruoyi.utils.qymqtt.newM; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import java.util.Map; import com.ruoyi.common.utils.spring.SpringUtils; import com.ruoyi.sy.domain.ArdSyCarLock; import com.ruoyi.sy.domain.ArdSyCarRtu; import com.ruoyi.sy.service.ArdSyCarLockService; import com.ruoyi.sy.service.ArdSyCarRtuService; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttClient; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import com.alibaba.fastjson.JSON; @Slf4j(topic = "mqttCar") public class MessageCallback implements MqttCallback { private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //注入电子锁Bean ArdSyCarRtuService carRtuService = SpringUtils.getBean(ArdSyCarRtuService.class); ArdSyCarLockService carLockService = SpringUtils.getBean(ArdSyCarLockService.class); @Override//丢失了对服务端的连接后触发的回调 public void connectionLost(Throwable arg0) { // 资源的清理 重连 System.out.println(arg0.getMessage()); arg0.printStackTrace(); System.out.println("丢失了对服务端的连接"); } @Override //消息发布者消息发布完成产生的回调 public void deliveryComplete(IMqttDeliveryToken token) { log.debug("deliveryComplete---------" + token.isComplete()); int messageId = token.getMessageId(); String[] topics = token.getTopics(); String clientId = token.getClient().getClientId(); // byte[] msg = token.getMessage().getPayload(); String topicStr = ""; for(String topic : topics){ topicStr = topicStr + topic + ","; } topicStr = topicStr.substring(0, topicStr.length() - 1); log.debug("消息发布完成,messageId="+messageId+",topics="+topicStr+",clientId="+clientId); } @Override//消息订阅者收到消息后触发的回调 public void messageArrived(String topic, MqttMessage message) throws Exception { try { log.debug("【车辆锁主题】:"+topic+"【车辆锁信息】:"+new String(message.getPayload(), StandardCharsets.UTF_8)); //出来回调数据 List ardSyCarLocks = carLockService.data(new String(message.getPayload(), StandardCharsets.UTF_8)); //根据topic查询RTUID ArdSyCarRtu ardSyCarRtu = carRtuService.subscribeByCarId(topic); String RTU = ardSyCarRtu.getId(); //根据RTUID查询锁 List ardSyCarLockList = carLockService.carLock(RTU); for (int i = 0; i < ardSyCarLocks.size(); i++) { ArdSyCarLock ardSyCarLockData = ardSyCarLocks.get(i); String relay = ardSyCarLockData.getRelay(); Integer relayInfo = ardSyCarLockData.getRelayInfo(); String current = ardSyCarLockData.getCurrents(); Double currentInfo = ardSyCarLockData.getCurrentInfo(); for (int j = 0; j < ardSyCarLockList.size(); j++) { ArdSyCarLock lock = ardSyCarLockList.get(j); String relay1 = lock.getRelay(); String current1 = lock.getCurrents(); Integer relayInfo1 = lock.getRelayInfo(); Double currentInfo1 = lock.getCurrentInfo(); Boolean upd = false; if(relay.equals(relay1)){ if(!relayInfo.equals(relayInfo1)){ lock.setRelayInfo(relayInfo); upd = true; } } if(current.equals(current1)){ if(!currentInfo.equals(currentInfo1)){ lock.setCurrentInfo(currentInfo); upd = true; } } if(upd){ int num = carLockService.updLock(lock); // log.debug("修改"+num+"数据"); } } } } catch (Exception e) { log.debug("车辆电磁锁处理mqtt消息异常:" + e); } } }