package com.ruoyi.utils.mqtt;
|
|
import com.ruoyi.alarm.global.service.IGlobalAlarmService;
|
import com.ruoyi.alarm.global.service.impl.GlobalAlarmServiceImpl;
|
import com.ruoyi.alarm.radar.service.ArdRadarService;
|
import com.ruoyi.common.utils.spring.SpringUtils;
|
import com.ruoyi.statistical.service.StatisticalService;
|
import com.ruoyi.storage.minio.service.IStorageMinioEventService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.eclipse.paho.client.mqttv3.*;
|
|
import java.nio.charset.StandardCharsets;
|
import java.util.Arrays;
|
|
/**
|
* @Description: mqtt回调处理类
|
* @ClassName: MqttConsumerCallback
|
* @Author: 刘苏义
|
* @Date: 2023年05月29日9:55
|
* @Version: 1.0
|
**/
|
@Slf4j(topic = "mqtt")
|
public class MqttConsumerCallback implements MqttCallbackExtended {
|
|
private MqttClient client;
|
private MqttConnectOptions options;
|
private String[] topic;
|
private int[] qos;
|
|
public MqttConsumerCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {
|
this.client = client;
|
this.options = options;
|
this.topic = topic;
|
this.qos = qos;
|
}
|
|
/**
|
* 断开重连
|
*/
|
@Override
|
public void connectionLost(Throwable cause) {
|
// log.info("MQTT连接断开,发起重连......");
|
while (!client.isConnected()) {
|
try {
|
Thread.sleep(10000);
|
if (null != client && !client.isConnected()) {
|
client.reconnect();
|
// log.error("尝试重新连接");
|
} else {
|
client.connect(options);
|
log.error("尝试建立新连接");
|
}
|
} catch (Exception e) {
|
log.error("断开重连异常:" + e.getMessage());
|
}
|
}
|
}
|
|
/**
|
* 接收到消息调用令牌中调用
|
*/
|
@Override
|
public void deliveryComplete(IMqttDeliveryToken token) {
|
|
//log.info("deliveryComplete---------" + Arrays.toString(topic));
|
}
|
|
/**
|
* 消息处理
|
*/
|
@Override
|
public void messageArrived(String topic, MqttMessage message) {
|
try {
|
// subscribe后得到的消息会执行到这里面
|
log.debug("接收消息 【主题】:" + topic + " 【内容】:" + new String(message.getPayload(), StandardCharsets.UTF_8));
|
//进行业务处理(接收报警数据)
|
IGlobalAlarmService globalAlarmService = SpringUtils.getBean(IGlobalAlarmService.class);
|
globalAlarmService.receiveAlarm(topic, new String(message.getPayload(), StandardCharsets.UTF_8));
|
|
if (topic.equals("minioEvent"))
|
{
|
IStorageMinioEventService storageMinioEventService = SpringUtils.getBean(IStorageMinioEventService.class);
|
storageMinioEventService.parseStorageMinioEvent(new String(message.getPayload(), StandardCharsets.UTF_8));
|
}
|
if (topic.equals("radarWellData"))
|
{
|
// System.out.println(new String(message.getPayload(), StandardCharsets.UTF_8));
|
StatisticalService statisticalService = SpringUtils.getBean(StatisticalService.class);
|
statisticalService.data(new String(message.getPayload(), StandardCharsets.UTF_8));
|
}
|
//接收雷达强制引导数据
|
if (topic.equals("radarForceGuide"))
|
{
|
ArdRadarService ardRadarService = SpringUtils.getBean(ArdRadarService.class);
|
ardRadarService.forceGuide(new String(message.getPayload(), StandardCharsets.UTF_8));
|
}
|
//接收雷达引导追踪数据
|
if (topic.equals("radarFollowGuide"))
|
{
|
ArdRadarService ardRadarService = SpringUtils.getBean(ArdRadarService.class);
|
ardRadarService.followGuide(new String(message.getPayload(), StandardCharsets.UTF_8));
|
}
|
} catch (Exception e) {
|
log.debug("处理mqtt消息异常:" + e);
|
}
|
}
|
|
/**
|
* mqtt连接后订阅主题
|
*/
|
@Override
|
public void connectComplete(boolean b, String s) {
|
try {
|
if (null != topic && null != qos) {
|
if (client.isConnected()) {
|
client.subscribe(topic, qos);
|
// log.info("mqtt连接成功" );
|
// log.info("订阅主题:" + Arrays.toString(topic));
|
} else {
|
log.info("mqtt连接失败");
|
}
|
}
|
} catch (Exception e) {
|
log.info("mqtt订阅主题异常:" + e);
|
}
|
}
|
}
|