From 157c26f5188c7ed62a4547f7e3b5a5a3e3ed7729 Mon Sep 17 00:00:00 2001
From: ‘liusuyi’ <1951119284@qq.com>
Date: 星期日, 08 十月 2023 09:40:47 +0800
Subject: [PATCH] 优化mqtt生产者取消消费者订阅

---
 src/main/java/com/ard/utils/mqtt/MqttProducer.java |   65 ++++++--------------------------
 1 files changed, 13 insertions(+), 52 deletions(-)

diff --git a/src/main/java/com/ard/utils/mqtt/MqttConsumer.java b/src/main/java/com/ard/utils/mqtt/MqttProducer.java
similarity index 66%
rename from src/main/java/com/ard/utils/mqtt/MqttConsumer.java
rename to src/main/java/com/ard/utils/mqtt/MqttProducer.java
index fc03b39..aef7240 100644
--- a/src/main/java/com/ard/utils/mqtt/MqttConsumer.java
+++ b/src/main/java/com/ard/utils/mqtt/MqttProducer.java
@@ -1,21 +1,19 @@
 package com.ard.utils.mqtt;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
 import org.eclipse.paho.client.mqttv3.*;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.core.annotation.Order;
-import org.springframework.expression.spel.ast.NullLiteral;
 import org.springframework.stereotype.Component;
 
 import java.io.UnsupportedEncodingException;
 
 /**
- * @Description: mqtt娑堣垂瀹㈡埛绔�
- * @ClassName: MqttConsumer
+ * @Description: mqtt鐢熶骇瀹㈡埛绔�
+ * @ClassName: MqttProducer
  * @Author: 鍒樿嫃涔�
  * @Date: 2023骞�05鏈�29鏃�9:55
  * @Version: 1.0
@@ -23,11 +21,9 @@
 @Component
 @Slf4j(topic = "mqtt")
 @Order(1)
-public class MqttConsumer implements ApplicationRunner {
+public class MqttProducer implements ApplicationRunner {
     @Value("${spring.mqtt.enabled}")
     private Boolean MQTT_ENABLED;
-    @Value("${spring.mqtt.topic}")
-    private String MQTT_TOPIC;
     @Value("${spring.mqtt.host}")
     private String MQTT_HOST;
     @Value("${spring.mqtt.clientId}")
@@ -60,11 +56,8 @@
             getClient();
             // 2 璁剧疆閰嶇疆
             MqttConnectOptions options = getOptions();
-            String[] topic = MQTT_TOPIC.split(",");
-            // 3 娑堟伅鍙戝竷璐ㄩ噺
-            int[] qos = getQos(topic.length);
-            // 4 鏈�鍚庤缃�
-            create(options, topic, qos);
+            // 3 鏈�鍚庤缃�
+            create(options);
         } catch (Exception e) {
             log.error("mqtt杩炴帴寮傚父锛�" + e);
         }
@@ -97,69 +90,37 @@
         // 璁剧疆浼氳瘽蹇冭烦鏃堕棿
         options.setKeepAliveInterval(MQTT_KEEP_ALIVE);
         // 鏄惁娓呴櫎session
-        options.setCleanSession(true);
+        options.setCleanSession(false);
         log.debug("--鐢熸垚mqtt閰嶇疆瀵硅薄");
         return options;
     }
 
-    /**
-     * qos   --- 3 ---
-     */
-    public int[] getQos(int length) {
-
-        int[] qos = new int[length];
-        for (int i = 0; i < length; i++) {
-            /**
-             *  MQTT鍗忚涓湁涓夌娑堟伅鍙戝竷鏈嶅姟璐ㄩ噺:
-             *
-             * QOS0锛� 鈥滆嚦澶氫竴娆♀�濓紝娑堟伅鍙戝竷瀹屽叏渚濊禆搴曞眰 TCP/IP 缃戠粶銆備細鍙戠敓娑堟伅涓㈠け鎴栭噸澶嶃�傝繖涓�绾у埆鍙敤浜庡涓嬫儏鍐碉紝鐜浼犳劅鍣ㄦ暟鎹紝涓㈠け涓�娆¤璁板綍鏃犳墍璋擄紝鍥犱负涓嶄箙鍚庤繕浼氭湁绗簩娆″彂閫併��
-             * QOS1锛� 鈥滆嚦灏戜竴娆♀�濓紝纭繚娑堟伅鍒拌揪锛屼絾娑堟伅閲嶅鍙兘浼氬彂鐢熴��
-             * QOS2锛� 鈥滃彧鏈変竴娆♀�濓紝纭繚娑堟伅鍒拌揪涓�娆°�傝繖涓�绾у埆鍙敤浜庡涓嬫儏鍐碉紝鍦ㄨ璐圭郴缁熶腑锛屾秷鎭噸澶嶆垨涓㈠け浼氬鑷翠笉姝g‘鐨勭粨鏋滐紝璧勬簮寮�閿�澶�
-             */
-            qos[i] = 1;
-        }
-        log.debug("--璁剧疆娑堟伅鍙戝竷璐ㄩ噺");
-        return qos;
-    }
 
     /**
-     * 瑁呰浇鍚勭瀹炰緥鍜岃闃呬富棰�  --- 4 ---
+     * 杩炴帴骞惰杞藉洖璋� --- 3 ---
      */
-    public void create(MqttConnectOptions options, String[] topic, int[] qos) {
+    public void create(MqttConnectOptions options) {
         try {
-            client.setCallback(new MqttConsumerCallback(client, options, topic, qos));
+            client.setCallback(new MqttProducerCallback(client, options));
             log.debug("--娣诲姞鍥炶皟澶勭悊绫�");
             client.connect(options);
         } catch (Exception e) {
-            log.info("瑁呰浇瀹炰緥鎴栬闃呬富棰樺紓甯革細" + e);
+            log.info("杩炴帴骞惰杞藉洖璋冨紓甯革細" + e);
         }
     }
 
-    /**
-     * 璁㈤槄鏌愪釜涓婚
-     *
-     * @param topic
-     * @param qos
-     */
-    public void subscribe(String topic, int qos) {
-        try {
-            log.debug("topic:" + topic);
-            client.subscribe(topic, qos);
-        } catch (MqttException e) {
-            e.printStackTrace();
-        }
-    }
+
 
     /**
      * 鍙戝竷锛岄潪鎸佷箙鍖�
      * <p>
-     * qos鏍规嵁鏂囨。璁剧疆涓�1
+     * qos鏍规嵁鏂囨。璁剧疆涓�2
      *
      * @param topic
      * @param msg
      */
     public static void publish(String topic, String msg) {
-        publish(1, false, topic, msg);
+        publish(2, false, topic, msg);
     }
 
     /**

--
Gitblit v1.9.3