package com.ruoyi.utils.mqtt; import com.ruoyi.alarm.global.service.impl.GlobalAlarmServiceImpl; import com.ruoyi.alarm.radar.service.ArdRadarService; import com.ruoyi.common.utils.spring.SpringUtils; import com.ruoyi.statistical.service.StatisticalService; import com.ruoyi.storage.minio.service.IStorageMinioEventService; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import java.nio.charset.StandardCharsets; import java.util.Arrays; /** * @Description: mqtt回调处理类 * @ClassName: MqttConsumerCallback * @Author: 刘苏义 * @Date: 2023年05月29日9:55 * @Version: 1.0 **/ @Slf4j(topic = "mqtt") public class MqttOnceCallback implements MqttCallbackExtended { private MqttClient client; private MqttConnectOptions options; private String topic; private int qos; public MqttOnceCallback(MqttClient client, MqttConnectOptions options, String topic, int qos) { this.client = client; this.options = options; this.topic = topic; this.qos = qos; } /** * 断开重连 */ @Override public void connectionLost(Throwable cause) { // log.info("车辆电磁锁MQTT连接断开,发起重连......"); while (!client.isConnected()) { try { Thread.sleep(10000); if (null != client && !client.isConnected()) { client.reconnect(); // log.error("车辆电磁锁尝试重新连接"); } else { client.connect(options); // log.error("车辆电磁锁尝试建立新连接"); } } catch (Exception e) { // log.error("车辆电磁锁断开重连异常:" + e.getMessage()); } } } /** * 接收到消息调用令牌中调用 */ @Override public void deliveryComplete(IMqttDeliveryToken token) { //log.info("deliveryComplete---------" + Arrays.toString(topic)); } /** * 消息处理 */ @Override public void messageArrived(String topic, MqttMessage message) { try { System.out.println("【车辆锁主题】:"+topic+"【车辆锁信息】:"+new String(message.getPayload(), StandardCharsets.UTF_8)); } catch (Exception e) { log.debug("车辆电磁锁处理mqtt消息异常:" + e); } } /** * mqtt连接后订阅主题 */ @Override public void connectComplete(boolean b, String s) { try { if (null != topic) { if (client.isConnected()) { client.subscribe(topic, qos); // log.info("车辆电磁锁mqtt连接成功" ); // log.info("--车辆电磁锁订阅主题::" + topic); } else { log.info("车辆电磁锁mqtt连接失败"); } } } catch (Exception e) { log.info("车辆电磁锁mqtt订阅主题异常:" + e); } } }