| package com.ruoyi.utils.mqtt; | 
|   | 
| import com.ruoyi.alarm.global.service.impl.GlobalAlarmServiceImpl; | 
| import com.ruoyi.alarm.radar.service.ArdRadarService; | 
| import com.ruoyi.common.utils.spring.SpringUtils; | 
| import com.ruoyi.statistical.service.StatisticalService; | 
| import com.ruoyi.storage.minio.service.IStorageMinioEventService; | 
| import lombok.extern.slf4j.Slf4j; | 
| import org.eclipse.paho.client.mqttv3.*; | 
|   | 
| import java.nio.charset.StandardCharsets; | 
| import java.util.Arrays; | 
|   | 
| /** | 
|  * @Description: mqtt回调处理类 | 
|  * @ClassName: MqttConsumerCallback | 
|  * @Author: 刘苏义 | 
|  * @Date: 2023年05月29日9:55 | 
|  * @Version: 1.0 | 
|  **/ | 
| @Slf4j(topic = "mqtt") | 
| public class MqttConsumerCallback implements MqttCallbackExtended { | 
|   | 
|     private MqttClient client; | 
|     private MqttConnectOptions options; | 
|     private String[] topic; | 
|     private int[] qos; | 
|   | 
|     public MqttConsumerCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) { | 
|         this.client = client; | 
|         this.options = options; | 
|         this.topic = topic; | 
|         this.qos = qos; | 
|     } | 
|   | 
|     /** | 
|      * 断开重连 | 
|      */ | 
|     @Override | 
|     public void connectionLost(Throwable cause) { | 
| //        log.info("MQTT连接断开,发起重连......"); | 
|         while (!client.isConnected()) { | 
|             try { | 
|                 Thread.sleep(10000); | 
|                 if (null != client && !client.isConnected()) { | 
|                     client.reconnect(); | 
| //                    log.error("尝试重新连接"); | 
|                 } else { | 
|                     client.connect(options); | 
|                     log.error("尝试建立新连接"); | 
|                 } | 
|             } catch (Exception e) { | 
|                 log.error("断开重连异常:" + e.getMessage()); | 
|             } | 
|         } | 
|     } | 
|   | 
|     /** | 
|      * 接收到消息调用令牌中调用 | 
|      */ | 
|     @Override | 
|     public void deliveryComplete(IMqttDeliveryToken token) { | 
|   | 
|         //log.info("deliveryComplete---------" + Arrays.toString(topic)); | 
|     } | 
|   | 
|     /** | 
|      * 消息处理 | 
|      */ | 
|     @Override | 
|     public void messageArrived(String topic, MqttMessage message) { | 
|         try { | 
|             // subscribe后得到的消息会执行到这里面 | 
|             log.debug("接收消息 【主题】:" + topic + " 【内容】:" + new String(message.getPayload(), StandardCharsets.UTF_8)); | 
|             //进行业务处理(接收报警数据) | 
|             GlobalAlarmServiceImpl globalAlarmService = SpringUtils.getBean(GlobalAlarmServiceImpl.class); | 
|             globalAlarmService.receiveAlarm(topic, new String(message.getPayload(), StandardCharsets.UTF_8)); | 
|             if (topic.equals("minioEvent")) | 
|             { | 
|                 IStorageMinioEventService storageMinioEventService = SpringUtils.getBean(IStorageMinioEventService.class); | 
|                 storageMinioEventService.parseStorageMinioEvent(new String(message.getPayload(), StandardCharsets.UTF_8)); | 
|             } | 
|             if (topic.equals("radarWellData")) | 
|             { | 
| //                System.out.println(new String(message.getPayload(), StandardCharsets.UTF_8)); | 
|                 StatisticalService statisticalService = SpringUtils.getBean(StatisticalService.class); | 
|                 statisticalService.data(new String(message.getPayload(), StandardCharsets.UTF_8)); | 
|             } | 
|             //接收雷达强制引导数据 | 
|             if (topic.equals("radarForceGuide")) | 
|             { | 
|                 ArdRadarService ardRadarService = SpringUtils.getBean(ArdRadarService.class); | 
|                 ardRadarService.forceGuide(new String(message.getPayload(), StandardCharsets.UTF_8)); | 
|             } | 
|             //接收雷达引导追踪数据 | 
|             if (topic.equals("radarFollowGuide")) | 
|             { | 
|                 ArdRadarService ardRadarService = SpringUtils.getBean(ArdRadarService.class); | 
|                 ardRadarService.followGuide(new String(message.getPayload(), StandardCharsets.UTF_8)); | 
|             } | 
|         } catch (Exception e) { | 
|             log.debug("处理mqtt消息异常:" + e); | 
|         } | 
|     } | 
|   | 
|     /** | 
|      * mqtt连接后订阅主题 | 
|      */ | 
|     @Override | 
|     public void connectComplete(boolean b, String s) { | 
|         try { | 
|             if (null != topic && null != qos) { | 
|                 if (client.isConnected()) { | 
|                     client.subscribe(topic, qos); | 
| //                    log.info("mqtt连接成功" ); | 
| //                    log.info("订阅主题:" + Arrays.toString(topic)); | 
|                 } else { | 
|                     log.info("mqtt连接失败"); | 
|                 } | 
|             } | 
|         } catch (Exception e) { | 
|             log.info("mqtt订阅主题异常:" + e); | 
|         } | 
|     } | 
| } |