From 157c26f5188c7ed62a4547f7e3b5a5a3e3ed7729 Mon Sep 17 00:00:00 2001
From: ‘liusuyi’ <1951119284@qq.com>
Date: 星期日, 08 十月 2023 09:40:47 +0800
Subject: [PATCH] 优化mqtt生产者取消消费者订阅

---
 src/main/java/com/ard/alarm/apponekey/service/impl/ArdAlarmApponekeyServiceImpl.java |    5 -
 src/main/java/com/ard/utils/tcp/ClientHandler.java                                   |    6 +-
 src/main/java/com/ard/alarm/stealelec/service/StealElecAlarmService.java             |    6 +-
 src/main/java/com/ard/utils/hiksdk/service/impl/FMSGCallBack.java                    |    8 +-
 src/main/java/com/ard/utils/mqtt/MqttProducerCallback.java                           |   25 +++-----
 src/main/java/com/ard/alarm/digitization/service/impl/DataBridgeServiceImpl.java     |    4 
 src/main/resources/application-dev.yml                                               |    3 
 src/main/java/com/ard/utils/mqtt/MqttProducer.java                                   |   65 ++++-----------------
 src/main/java/com/ard/alarm/tube/service/TubeAlarmService.java                       |    4 
 9 files changed, 39 insertions(+), 87 deletions(-)

diff --git a/src/main/java/com/ard/alarm/apponekey/service/impl/ArdAlarmApponekeyServiceImpl.java b/src/main/java/com/ard/alarm/apponekey/service/impl/ArdAlarmApponekeyServiceImpl.java
index f002e47..dba5f71 100644
--- a/src/main/java/com/ard/alarm/apponekey/service/impl/ArdAlarmApponekeyServiceImpl.java
+++ b/src/main/java/com/ard/alarm/apponekey/service/impl/ArdAlarmApponekeyServiceImpl.java
@@ -1,10 +1,9 @@
 package com.ard.alarm.apponekey.service.impl;
 
 import java.util.Date;
-import java.util.List;
 
 import com.alibaba.fastjson2.JSON;
-import com.ard.utils.mqtt.MqttConsumer;
+import com.ard.utils.mqtt.MqttProducer;
 import com.ard.utils.uuid.IdUtils;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
@@ -34,7 +33,7 @@
             ardAlarmApponekey.setId(IdUtils.fastSimpleUUID());
             ardAlarmApponekey.setCreateTime(new Date());
             printLog(ardAlarmApponekey);
-            MqttConsumer.publish(2, false, "apponekey", JSON.toJSONString(ardAlarmApponekey));
+            MqttProducer.publish(2, false, "apponekey", JSON.toJSONString(ardAlarmApponekey));
             return 1;
         } catch (Exception ex) {
             log.error("涓�閿姤璀﹀鐞嗗紓甯�:" + ex.getMessage());
diff --git a/src/main/java/com/ard/alarm/digitization/service/impl/DataBridgeServiceImpl.java b/src/main/java/com/ard/alarm/digitization/service/impl/DataBridgeServiceImpl.java
index 981f2a8..8b04e6f 100644
--- a/src/main/java/com/ard/alarm/digitization/service/impl/DataBridgeServiceImpl.java
+++ b/src/main/java/com/ard/alarm/digitization/service/impl/DataBridgeServiceImpl.java
@@ -4,7 +4,7 @@
 import com.ard.alarm.digitization.model.DataBridge;
 import com.ard.alarm.digitization.service.DataBridgeService;
 import com.ard.utils.jdbc.Query;
-import com.ard.utils.mqtt.MqttConsumer;
+import com.ard.utils.mqtt.MqttProducer;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.scheduling.annotation.Scheduled;
@@ -52,7 +52,7 @@
                     data.setWellNo(elem[0]);
                     data.setAlarmType(elem[1]);
                     data.setAlarmTime(elem[2]);
-                    MqttConsumer.publish(2, false, "digitization3", JSON.toJSONString(data));
+                    MqttProducer.publish(2, false, "digitization3", JSON.toJSONString(data));
                 }
             }
         } catch (Exception ex) {
diff --git a/src/main/java/com/ard/alarm/stealelec/service/StealElecAlarmService.java b/src/main/java/com/ard/alarm/stealelec/service/StealElecAlarmService.java
index ced8991..2714637 100644
--- a/src/main/java/com/ard/alarm/stealelec/service/StealElecAlarmService.java
+++ b/src/main/java/com/ard/alarm/stealelec/service/StealElecAlarmService.java
@@ -4,13 +4,13 @@
 import com.alibaba.fastjson2.JSONObject;
 import com.ard.alarm.stealelec.domain.ArdAlarmStealelec;
 import com.ard.utils.http.HttpUtils;
-import com.ard.utils.mqtt.MqttConsumer;
+import com.ard.utils.mqtt.MqttProducer;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Service;
-import javax.annotation.Resource;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -68,7 +68,7 @@
                     if (tempList.contains(wd.getId())) {
                         continue;
                     }
-                    MqttConsumer.publish(2, false, "stealelec", JSON.toJSONString(wd));
+                    MqttProducer.publish(2, false, "stealelec", JSON.toJSONString(wd));
                     tempList.add(wd.getId());
                 }
             }
diff --git a/src/main/java/com/ard/alarm/tube/service/TubeAlarmService.java b/src/main/java/com/ard/alarm/tube/service/TubeAlarmService.java
index 06584e6..f15cb3f 100644
--- a/src/main/java/com/ard/alarm/tube/service/TubeAlarmService.java
+++ b/src/main/java/com/ard/alarm/tube/service/TubeAlarmService.java
@@ -2,7 +2,7 @@
 
 import com.alibaba.fastjson2.JSON;
 import com.ard.utils.other.DateUtils;
-import com.ard.utils.mqtt.MqttConsumer;
+import com.ard.utils.mqtt.MqttProducer;
 import com.ard.utils.udp.NettyUdpServer;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Value;
@@ -73,7 +73,7 @@
                 log.debug("浣嶇疆锛�" + position);
                 log.debug("鍊肩彮浜猴細" + watcher);
                 log.debug("缁撴潫瑙f瀽" + stop);
-                MqttConsumer.publish(2, false, "tube", JSON.toJSONString(map));
+                MqttProducer.publish(2, false, "tube", JSON.toJSONString(map));
             } else {
                 log.error("鏁版嵁寮傚父");
             }
diff --git a/src/main/java/com/ard/utils/hiksdk/service/impl/FMSGCallBack.java b/src/main/java/com/ard/utils/hiksdk/service/impl/FMSGCallBack.java
index 62f68de..7deea3e 100644
--- a/src/main/java/com/ard/utils/hiksdk/service/impl/FMSGCallBack.java
+++ b/src/main/java/com/ard/utils/hiksdk/service/impl/FMSGCallBack.java
@@ -14,7 +14,7 @@
 import com.ard.utils.hiksdk.domain.ExternalAlarmEventInfo;
 import com.ard.utils.hiksdk.util.hikSdkUtil.HCNetSDK;
 import com.ard.utils.other.DateUtils;
-import com.ard.utils.mqtt.MqttConsumer;
+import com.ard.utils.mqtt.MqttProducer;
 import com.ard.utils.spring.SpringUtils;
 import com.ard.utils.uuid.IdUtils;
 import com.sun.jna.Pointer;
@@ -440,7 +440,7 @@
      */
     private void publishMqtt(CameraEventInfo info) {
         printLog(info);
-        MqttConsumer.publish(2, false, "camera", JSON.toJSONString(info));
+        MqttProducer.publish(2, false, "camera", JSON.toJSONString(info));
     }
 
     /**
@@ -448,7 +448,7 @@
      */
     private void publishMqtt(ExternalAlarmEventInfo info) {
         printLog(info);
-        MqttConsumer.publish(2, false, "external", JSON.toJSONString(info));
+        MqttProducer.publish(2, false, "external", JSON.toJSONString(info));
     }
 
     /**
@@ -456,7 +456,7 @@
      */
     private void publishMqtt(AccessControlHostEventInfo info) {
         printLog(info);
-        MqttConsumer.publish(2, false, "accessControl", JSON.toJSONString(info));
+        MqttProducer.publish(2, false, "accessControl", JSON.toJSONString(info));
     }
 
     /**
diff --git a/src/main/java/com/ard/utils/mqtt/MqttConsumer.java b/src/main/java/com/ard/utils/mqtt/MqttProducer.java
similarity index 66%
rename from src/main/java/com/ard/utils/mqtt/MqttConsumer.java
rename to src/main/java/com/ard/utils/mqtt/MqttProducer.java
index fc03b39..aef7240 100644
--- a/src/main/java/com/ard/utils/mqtt/MqttConsumer.java
+++ b/src/main/java/com/ard/utils/mqtt/MqttProducer.java
@@ -1,21 +1,19 @@
 package com.ard.utils.mqtt;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
 import org.eclipse.paho.client.mqttv3.*;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.core.annotation.Order;
-import org.springframework.expression.spel.ast.NullLiteral;
 import org.springframework.stereotype.Component;
 
 import java.io.UnsupportedEncodingException;
 
 /**
- * @Description: mqtt娑堣垂瀹㈡埛绔�
- * @ClassName: MqttConsumer
+ * @Description: mqtt鐢熶骇瀹㈡埛绔�
+ * @ClassName: MqttProducer
  * @Author: 鍒樿嫃涔�
  * @Date: 2023骞�05鏈�29鏃�9:55
  * @Version: 1.0
@@ -23,11 +21,9 @@
 @Component
 @Slf4j(topic = "mqtt")
 @Order(1)
-public class MqttConsumer implements ApplicationRunner {
+public class MqttProducer implements ApplicationRunner {
     @Value("${spring.mqtt.enabled}")
     private Boolean MQTT_ENABLED;
-    @Value("${spring.mqtt.topic}")
-    private String MQTT_TOPIC;
     @Value("${spring.mqtt.host}")
     private String MQTT_HOST;
     @Value("${spring.mqtt.clientId}")
@@ -60,11 +56,8 @@
             getClient();
             // 2 璁剧疆閰嶇疆
             MqttConnectOptions options = getOptions();
-            String[] topic = MQTT_TOPIC.split(",");
-            // 3 娑堟伅鍙戝竷璐ㄩ噺
-            int[] qos = getQos(topic.length);
-            // 4 鏈�鍚庤缃�
-            create(options, topic, qos);
+            // 3 鏈�鍚庤缃�
+            create(options);
         } catch (Exception e) {
             log.error("mqtt杩炴帴寮傚父锛�" + e);
         }
@@ -97,69 +90,37 @@
         // 璁剧疆浼氳瘽蹇冭烦鏃堕棿
         options.setKeepAliveInterval(MQTT_KEEP_ALIVE);
         // 鏄惁娓呴櫎session
-        options.setCleanSession(true);
+        options.setCleanSession(false);
         log.debug("--鐢熸垚mqtt閰嶇疆瀵硅薄");
         return options;
     }
 
-    /**
-     * qos   --- 3 ---
-     */
-    public int[] getQos(int length) {
-
-        int[] qos = new int[length];
-        for (int i = 0; i < length; i++) {
-            /**
-             *  MQTT鍗忚涓湁涓夌娑堟伅鍙戝竷鏈嶅姟璐ㄩ噺:
-             *
-             * QOS0锛� 鈥滆嚦澶氫竴娆♀�濓紝娑堟伅鍙戝竷瀹屽叏渚濊禆搴曞眰 TCP/IP 缃戠粶銆備細鍙戠敓娑堟伅涓㈠け鎴栭噸澶嶃�傝繖涓�绾у埆鍙敤浜庡涓嬫儏鍐碉紝鐜浼犳劅鍣ㄦ暟鎹紝涓㈠け涓�娆¤璁板綍鏃犳墍璋擄紝鍥犱负涓嶄箙鍚庤繕浼氭湁绗簩娆″彂閫併��
-             * QOS1锛� 鈥滆嚦灏戜竴娆♀�濓紝纭繚娑堟伅鍒拌揪锛屼絾娑堟伅閲嶅鍙兘浼氬彂鐢熴��
-             * QOS2锛� 鈥滃彧鏈変竴娆♀�濓紝纭繚娑堟伅鍒拌揪涓�娆°�傝繖涓�绾у埆鍙敤浜庡涓嬫儏鍐碉紝鍦ㄨ璐圭郴缁熶腑锛屾秷鎭噸澶嶆垨涓㈠け浼氬鑷翠笉姝g‘鐨勭粨鏋滐紝璧勬簮寮�閿�澶�
-             */
-            qos[i] = 1;
-        }
-        log.debug("--璁剧疆娑堟伅鍙戝竷璐ㄩ噺");
-        return qos;
-    }
 
     /**
-     * 瑁呰浇鍚勭瀹炰緥鍜岃闃呬富棰�  --- 4 ---
+     * 杩炴帴骞惰杞藉洖璋� --- 3 ---
      */
-    public void create(MqttConnectOptions options, String[] topic, int[] qos) {
+    public void create(MqttConnectOptions options) {
         try {
-            client.setCallback(new MqttConsumerCallback(client, options, topic, qos));
+            client.setCallback(new MqttProducerCallback(client, options));
             log.debug("--娣诲姞鍥炶皟澶勭悊绫�");
             client.connect(options);
         } catch (Exception e) {
-            log.info("瑁呰浇瀹炰緥鎴栬闃呬富棰樺紓甯革細" + e);
+            log.info("杩炴帴骞惰杞藉洖璋冨紓甯革細" + e);
         }
     }
 
-    /**
-     * 璁㈤槄鏌愪釜涓婚
-     *
-     * @param topic
-     * @param qos
-     */
-    public void subscribe(String topic, int qos) {
-        try {
-            log.debug("topic:" + topic);
-            client.subscribe(topic, qos);
-        } catch (MqttException e) {
-            e.printStackTrace();
-        }
-    }
+
 
     /**
      * 鍙戝竷锛岄潪鎸佷箙鍖�
      * <p>
-     * qos鏍规嵁鏂囨。璁剧疆涓�1
+     * qos鏍规嵁鏂囨。璁剧疆涓�2
      *
      * @param topic
      * @param msg
      */
     public static void publish(String topic, String msg) {
-        publish(1, false, topic, msg);
+        publish(2, false, topic, msg);
     }
 
     /**
diff --git a/src/main/java/com/ard/utils/mqtt/MqttConsumerCallback.java b/src/main/java/com/ard/utils/mqtt/MqttProducerCallback.java
similarity index 72%
rename from src/main/java/com/ard/utils/mqtt/MqttConsumerCallback.java
rename to src/main/java/com/ard/utils/mqtt/MqttProducerCallback.java
index b0ee239..af6cac1 100644
--- a/src/main/java/com/ard/utils/mqtt/MqttConsumerCallback.java
+++ b/src/main/java/com/ard/utils/mqtt/MqttProducerCallback.java
@@ -2,6 +2,7 @@
 
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.*;
+
 import java.util.Arrays;
 
 /**
@@ -12,18 +13,14 @@
  * @Version: 1.0
  **/
 @Slf4j(topic = "mqtt")
-public class MqttConsumerCallback implements MqttCallbackExtended {
+public class MqttProducerCallback implements MqttCallbackExtended {
 
     private MqttClient client;
     private MqttConnectOptions options;
-    private String[] topic;
-    private int[] qos;
 
-    public MqttConsumerCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {
+    public MqttProducerCallback(MqttClient client, MqttConnectOptions options) {
         this.client = client;
         this.options = options;
-        this.topic = topic;
-        this.qos = qos;
     }
 
     /**
@@ -73,22 +70,18 @@
     }
 
     /**
-     * mqtt杩炴帴鍚庤闃呬富棰�
+     * mqtt杩炴帴瀹屾垚
      */
     @Override
     public void connectComplete(boolean b, String s) {
         try {
-            if (null != topic && null != qos) {
-                if (client.isConnected()) {
-                    client.subscribe(topic, qos);
-                    log.debug("mqtt杩炴帴鎴愬姛");
-                    log.debug("--璁㈤槄涓婚:锛�" + Arrays.toString(topic));
-                } else {
-                    log.debug("mqtt杩炴帴澶辫触");
-                }
+            if (client.isConnected()) {
+                log.info("mqtt杩炴帴鎴愬姛");
+            } else {
+                log.info("mqtt杩炴帴澶辫触");
             }
         } catch (Exception e) {
-            log.error("mqtt璁㈤槄涓婚寮傚父:" + e);
+            log.error("mqtt杩炴帴寮傚父:" + e);
         }
     }
 }
diff --git a/src/main/java/com/ard/utils/tcp/ClientHandler.java b/src/main/java/com/ard/utils/tcp/ClientHandler.java
index dd35018..6df4b26 100644
--- a/src/main/java/com/ard/utils/tcp/ClientHandler.java
+++ b/src/main/java/com/ard/utils/tcp/ClientHandler.java
@@ -6,7 +6,7 @@
 import com.ard.alarm.radar.domain.RadarAlarmData;
 import com.ard.utils.other.ByteUtils;
 import com.ard.utils.other.GisUtils;
-import com.ard.utils.mqtt.MqttConsumer;
+import com.ard.utils.mqtt.MqttProducer;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelId;
@@ -432,10 +432,10 @@
             radarAlarmData.setRadarName(radarName);
             radarAlarmData.setAlarmTime(alarmTime);
             radarAlarmData.setArdAlarmRadars(radarAlarmInfos);
-            MqttConsumer.publish(2, false, "radar", JSON.toJSONString(radarAlarmData));
+            MqttProducer.publish(2, false, "radar", JSON.toJSONString(radarAlarmData));
             //鎶芥补鏈虹姸鎬丮QTT闃熷垪
             radarAlarmData.setArdAlarmRadars(well);
-            MqttConsumer.publish(2, false, "radarWellData", JSON.toJSONString(radarAlarmData));
+            MqttProducer.publish(2, false, "radarWellData", JSON.toJSONString(radarAlarmData));
         } catch (Exception ex) {
             log.error("闆疯揪鎶ユ枃瑙f瀽寮傚父:" + ex.getMessage());
         }
diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml
index 83184ea..a7b1837 100644
--- a/src/main/resources/application-dev.yml
+++ b/src/main/resources/application-dev.yml
@@ -39,10 +39,9 @@
   # mqtt閰嶇疆
   mqtt:
     host: tcp://192.168.2.15:1883
-    clientId: c227
+    clientId: client-227
     username: admin
     password: admin
-    topic: tube
     timeout: 100
     keepalive: 60
     enabled: true

--
Gitblit v1.9.3