package com.ruoyi.utils.qymqtt.oldM; import com.ruoyi.common.utils.spring.SpringUtils; import com.ruoyi.sy.domain.ArdSyCarLock; import com.ruoyi.sy.domain.ArdSyCarRtu; import com.ruoyi.sy.service.ArdSyCarLockService; import com.ruoyi.sy.service.ArdSyCarRtuService; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import java.nio.charset.StandardCharsets; import java.util.List; /** * 消费监听 */ @Slf4j public class PushCallback implements MqttCallback { private MqttClient client; //注入电子锁Bean ArdSyCarRtuService carRtuService = SpringUtils.getBean(ArdSyCarRtuService.class); ArdSyCarLockService carLockService = SpringUtils.getBean(ArdSyCarLockService.class); // private MqttClient client; // private MqttConnectOptions options; // private String topic; // private int qos; // public PushCallback(MqttClient client, MqttConnectOptions options, String topic, int qos) { // this.client = client; // this.options = options; // this.topic = topic; // this.qos = qos; // } @Override public void connectionLost(Throwable throwable) { if (client == null || !client.isConnected()) { //log.error("连接断开,正在重连...."); } } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { try { // System.out.println("【车辆锁主题】:"+topic+"【车辆锁信息】:"+new String(message.getPayload(), StandardCharsets.UTF_8)); //出来回调数据 List ardSyCarLocks = carLockService.data(new String(message.getPayload(), StandardCharsets.UTF_8)); //根据topic查询RTUID ArdSyCarRtu ardSyCarRtu = carRtuService.subscribeByCarId(topic); String RTU = ardSyCarRtu.getId(); //根据RTUID查询锁 List ardSyCarLockList = carLockService.carLock(RTU); for (int i = 0; i < ardSyCarLocks.size(); i++) { ArdSyCarLock ardSyCarLockData = ardSyCarLocks.get(i); String relay = ardSyCarLockData.getRelay(); Integer relayInfo = ardSyCarLockData.getRelayInfo(); String current = ardSyCarLockData.getCurrents(); Double currentInfo = ardSyCarLockData.getCurrentInfo(); for (int j = 0; j < ardSyCarLockList.size(); j++) { ArdSyCarLock lock = ardSyCarLockList.get(j); String relay1 = lock.getRelay(); String current1 = lock.getCurrents(); Integer relayInfo1 = lock.getRelayInfo(); Double currentInfo1 = lock.getCurrentInfo(); Boolean upd = false; if(relay.equals(relay1)){ if(!relayInfo.equals(relayInfo1)){ lock.setRelayInfo(relayInfo); upd = true; } } if(current.equals(current1)){ if(!currentInfo.equals(currentInfo1)){ lock.setCurrentInfo(currentInfo); upd = true; } } if(upd){ int num = carLockService.updLock(lock); // log.debug("修改"+num+"数据"); } } } } catch (Exception e) { log.debug("车辆电磁锁处理mqtt消息异常:" + e); } } @SneakyThrows @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); int messageId = token.getMessageId(); String[] topics = token.getTopics(); String clientId = token.getClient().getClientId(); // byte[] msg = token.getMessage().getPayload(); String topicStr = ""; for(String topic : topics){ topicStr = topicStr + topic + ","; } topicStr = topicStr.substring(0, topicStr.length() - 1); System.out.println("消息发布完成,messageId="+messageId+",topics="+topicStr+",clientId="+clientId); } }