‘liusuyi’
2023-10-08 157c26f5188c7ed62a4547f7e3b5a5a3e3ed7729
优化mqtt生产者取消消费者订阅
已重命名2个文件
已修改7个文件
122 ■■■■■ 文件已修改
src/main/java/com/ard/alarm/apponekey/service/impl/ArdAlarmApponekeyServiceImpl.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/ard/alarm/digitization/service/impl/DataBridgeServiceImpl.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/ard/alarm/stealelec/service/StealElecAlarmService.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/ard/alarm/tube/service/TubeAlarmService.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/ard/utils/hiksdk/service/impl/FMSGCallBack.java 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/ard/utils/mqtt/MqttProducer.java 65 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/ard/utils/mqtt/MqttProducerCallback.java 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/ard/utils/tcp/ClientHandler.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/application-dev.yml 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/ard/alarm/apponekey/service/impl/ArdAlarmApponekeyServiceImpl.java
@@ -1,10 +1,9 @@
package com.ard.alarm.apponekey.service.impl;
import java.util.Date;
import java.util.List;
import com.alibaba.fastjson2.JSON;
import com.ard.utils.mqtt.MqttConsumer;
import com.ard.utils.mqtt.MqttProducer;
import com.ard.utils.uuid.IdUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@@ -34,7 +33,7 @@
            ardAlarmApponekey.setId(IdUtils.fastSimpleUUID());
            ardAlarmApponekey.setCreateTime(new Date());
            printLog(ardAlarmApponekey);
            MqttConsumer.publish(2, false, "apponekey", JSON.toJSONString(ardAlarmApponekey));
            MqttProducer.publish(2, false, "apponekey", JSON.toJSONString(ardAlarmApponekey));
            return 1;
        } catch (Exception ex) {
            log.error("一键报警处理异常:" + ex.getMessage());
src/main/java/com/ard/alarm/digitization/service/impl/DataBridgeServiceImpl.java
@@ -4,7 +4,7 @@
import com.ard.alarm.digitization.model.DataBridge;
import com.ard.alarm.digitization.service.DataBridgeService;
import com.ard.utils.jdbc.Query;
import com.ard.utils.mqtt.MqttConsumer;
import com.ard.utils.mqtt.MqttProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
@@ -52,7 +52,7 @@
                    data.setWellNo(elem[0]);
                    data.setAlarmType(elem[1]);
                    data.setAlarmTime(elem[2]);
                    MqttConsumer.publish(2, false, "digitization3", JSON.toJSONString(data));
                    MqttProducer.publish(2, false, "digitization3", JSON.toJSONString(data));
                }
            }
        } catch (Exception ex) {
src/main/java/com/ard/alarm/stealelec/service/StealElecAlarmService.java
@@ -4,13 +4,13 @@
import com.alibaba.fastjson2.JSONObject;
import com.ard.alarm.stealelec.domain.ArdAlarmStealelec;
import com.ard.utils.http.HttpUtils;
import com.ard.utils.mqtt.MqttConsumer;
import com.ard.utils.mqtt.MqttProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -68,7 +68,7 @@
                    if (tempList.contains(wd.getId())) {
                        continue;
                    }
                    MqttConsumer.publish(2, false, "stealelec", JSON.toJSONString(wd));
                    MqttProducer.publish(2, false, "stealelec", JSON.toJSONString(wd));
                    tempList.add(wd.getId());
                }
            }
src/main/java/com/ard/alarm/tube/service/TubeAlarmService.java
@@ -2,7 +2,7 @@
import com.alibaba.fastjson2.JSON;
import com.ard.utils.other.DateUtils;
import com.ard.utils.mqtt.MqttConsumer;
import com.ard.utils.mqtt.MqttProducer;
import com.ard.utils.udp.NettyUdpServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
@@ -73,7 +73,7 @@
                log.debug("位置:" + position);
                log.debug("值班人:" + watcher);
                log.debug("结束解析" + stop);
                MqttConsumer.publish(2, false, "tube", JSON.toJSONString(map));
                MqttProducer.publish(2, false, "tube", JSON.toJSONString(map));
            } else {
                log.error("数据异常");
            }
src/main/java/com/ard/utils/hiksdk/service/impl/FMSGCallBack.java
@@ -14,7 +14,7 @@
import com.ard.utils.hiksdk.domain.ExternalAlarmEventInfo;
import com.ard.utils.hiksdk.util.hikSdkUtil.HCNetSDK;
import com.ard.utils.other.DateUtils;
import com.ard.utils.mqtt.MqttConsumer;
import com.ard.utils.mqtt.MqttProducer;
import com.ard.utils.spring.SpringUtils;
import com.ard.utils.uuid.IdUtils;
import com.sun.jna.Pointer;
@@ -440,7 +440,7 @@
     */
    private void publishMqtt(CameraEventInfo info) {
        printLog(info);
        MqttConsumer.publish(2, false, "camera", JSON.toJSONString(info));
        MqttProducer.publish(2, false, "camera", JSON.toJSONString(info));
    }
    /**
@@ -448,7 +448,7 @@
     */
    private void publishMqtt(ExternalAlarmEventInfo info) {
        printLog(info);
        MqttConsumer.publish(2, false, "external", JSON.toJSONString(info));
        MqttProducer.publish(2, false, "external", JSON.toJSONString(info));
    }
    /**
@@ -456,7 +456,7 @@
     */
    private void publishMqtt(AccessControlHostEventInfo info) {
        printLog(info);
        MqttConsumer.publish(2, false, "accessControl", JSON.toJSONString(info));
        MqttProducer.publish(2, false, "accessControl", JSON.toJSONString(info));
    }
    /**
src/main/java/com/ard/utils/mqtt/MqttProducer.java
ÎļþÃû´Ó src/main/java/com/ard/utils/mqtt/MqttConsumer.java ÐÞ¸Ä
@@ -1,21 +1,19 @@
package com.ard.utils.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.expression.spel.ast.NullLiteral;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
/**
 * @Description: mqtt消费客户端
 * @ClassName: MqttConsumer
 * @Description: mqtt生产客户端
 * @ClassName: MqttProducer
 * @Author: åˆ˜è‹ä¹‰
 * @Date: 2023å¹´05月29日9:55
 * @Version: 1.0
@@ -23,11 +21,9 @@
@Component
@Slf4j(topic = "mqtt")
@Order(1)
public class MqttConsumer implements ApplicationRunner {
public class MqttProducer implements ApplicationRunner {
    @Value("${spring.mqtt.enabled}")
    private Boolean MQTT_ENABLED;
    @Value("${spring.mqtt.topic}")
    private String MQTT_TOPIC;
    @Value("${spring.mqtt.host}")
    private String MQTT_HOST;
    @Value("${spring.mqtt.clientId}")
@@ -60,11 +56,8 @@
            getClient();
            // 2 è®¾ç½®é…ç½®
            MqttConnectOptions options = getOptions();
            String[] topic = MQTT_TOPIC.split(",");
            // 3 æ¶ˆæ¯å‘布质量
            int[] qos = getQos(topic.length);
            // 4 æœ€åŽè®¾ç½®
            create(options, topic, qos);
            // 3 æœ€åŽè®¾ç½®
            create(options);
        } catch (Exception e) {
            log.error("mqtt连接异常:" + e);
        }
@@ -97,69 +90,37 @@
        // è®¾ç½®ä¼šè¯å¿ƒè·³æ—¶é—´
        options.setKeepAliveInterval(MQTT_KEEP_ALIVE);
        // æ˜¯å¦æ¸…除session
        options.setCleanSession(true);
        options.setCleanSession(false);
        log.debug("--生成mqtt配置对象");
        return options;
    }
    /**
     * qos   --- 3 ---
     */
    public int[] getQos(int length) {
        int[] qos = new int[length];
        for (int i = 0; i < length; i++) {
            /**
             *  MQTT协议中有三种消息发布服务质量:
             *
             * QOS0: â€œè‡³å¤šä¸€æ¬¡â€ï¼Œæ¶ˆæ¯å‘布完全依赖底层 TCP/IP ç½‘络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
             * QOS1: â€œè‡³å°‘一次”,确保消息到达,但消息重复可能会发生。
             * QOS2: â€œåªæœ‰ä¸€æ¬¡â€ï¼Œç¡®ä¿æ¶ˆæ¯åˆ°è¾¾ä¸€æ¬¡ã€‚这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果,资源开销大
             */
            qos[i] = 1;
        }
        log.debug("--设置消息发布质量");
        return qos;
    }
    /**
     * è£…载各种实例和订阅主题  --- 4 ---
     * è¿žæŽ¥å¹¶è£…载回调 --- 3 ---
     */
    public void create(MqttConnectOptions options, String[] topic, int[] qos) {
    public void create(MqttConnectOptions options) {
        try {
            client.setCallback(new MqttConsumerCallback(client, options, topic, qos));
            client.setCallback(new MqttProducerCallback(client, options));
            log.debug("--添加回调处理类");
            client.connect(options);
        } catch (Exception e) {
            log.info("装载实例或订阅主题异常:" + e);
            log.info("连接并装载回调异常:" + e);
        }
    }
    /**
     * è®¢é˜…某个主题
     *
     * @param topic
     * @param qos
     */
    public void subscribe(String topic, int qos) {
        try {
            log.debug("topic:" + topic);
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    /**
     * å‘布,非持久化
     * <p>
     * qos根据文档设置为1
     * qos根据文档设置为2
     *
     * @param topic
     * @param msg
     */
    public static void publish(String topic, String msg) {
        publish(1, false, topic, msg);
        publish(2, false, topic, msg);
    }
    /**
src/main/java/com/ard/utils/mqtt/MqttProducerCallback.java
ÎļþÃû´Ó src/main/java/com/ard/utils/mqtt/MqttConsumerCallback.java ÐÞ¸Ä
@@ -2,6 +2,7 @@
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import java.util.Arrays;
/**
@@ -12,18 +13,14 @@
 * @Version: 1.0
 **/
@Slf4j(topic = "mqtt")
public class MqttConsumerCallback implements MqttCallbackExtended {
public class MqttProducerCallback implements MqttCallbackExtended {
    private MqttClient client;
    private MqttConnectOptions options;
    private String[] topic;
    private int[] qos;
    public MqttConsumerCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {
    public MqttProducerCallback(MqttClient client, MqttConnectOptions options) {
        this.client = client;
        this.options = options;
        this.topic = topic;
        this.qos = qos;
    }
    /**
@@ -73,22 +70,18 @@
    }
    /**
     * mqtt连接后订阅主题
     * mqtt连接完成
     */
    @Override
    public void connectComplete(boolean b, String s) {
        try {
            if (null != topic && null != qos) {
                if (client.isConnected()) {
                    client.subscribe(topic, qos);
                    log.debug("mqtt连接成功");
                    log.debug("--订阅主题::" + Arrays.toString(topic));
                log.info("mqtt连接成功");
                } else {
                    log.debug("mqtt连接失败");
                }
                log.info("mqtt连接失败");
            }
        } catch (Exception e) {
            log.error("mqtt订阅主题异常:" + e);
            log.error("mqtt连接异常:" + e);
        }
    }
}
src/main/java/com/ard/utils/tcp/ClientHandler.java
@@ -6,7 +6,7 @@
import com.ard.alarm.radar.domain.RadarAlarmData;
import com.ard.utils.other.ByteUtils;
import com.ard.utils.other.GisUtils;
import com.ard.utils.mqtt.MqttConsumer;
import com.ard.utils.mqtt.MqttProducer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
@@ -432,10 +432,10 @@
            radarAlarmData.setRadarName(radarName);
            radarAlarmData.setAlarmTime(alarmTime);
            radarAlarmData.setArdAlarmRadars(radarAlarmInfos);
            MqttConsumer.publish(2, false, "radar", JSON.toJSONString(radarAlarmData));
            MqttProducer.publish(2, false, "radar", JSON.toJSONString(radarAlarmData));
            //抽油机状态MQTT队列
            radarAlarmData.setArdAlarmRadars(well);
            MqttConsumer.publish(2, false, "radarWellData", JSON.toJSONString(radarAlarmData));
            MqttProducer.publish(2, false, "radarWellData", JSON.toJSONString(radarAlarmData));
        } catch (Exception ex) {
            log.error("雷达报文解析异常:" + ex.getMessage());
        }
src/main/resources/application-dev.yml
@@ -39,10 +39,9 @@
  # mqtt配置
  mqtt:
    host: tcp://192.168.2.15:1883
    clientId: c227
    clientId: client-227
    username: admin
    password: admin
    topic: tube
    timeout: 100
    keepalive: 60
    enabled: true