From 157c26f5188c7ed62a4547f7e3b5a5a3e3ed7729 Mon Sep 17 00:00:00 2001
From: ‘liusuyi’ <1951119284@qq.com>
Date: 星期日, 08 十月 2023 09:40:47 +0800
Subject: [PATCH] 优化mqtt生产者取消消费者订阅
---
src/main/java/com/ard/utils/mqtt/MqttProducer.java | 65 ++++++--------------------------
1 files changed, 13 insertions(+), 52 deletions(-)
diff --git a/src/main/java/com/ard/utils/mqtt/MqttConsumer.java b/src/main/java/com/ard/utils/mqtt/MqttProducer.java
similarity index 66%
rename from src/main/java/com/ard/utils/mqtt/MqttConsumer.java
rename to src/main/java/com/ard/utils/mqtt/MqttProducer.java
index fc03b39..aef7240 100644
--- a/src/main/java/com/ard/utils/mqtt/MqttConsumer.java
+++ b/src/main/java/com/ard/utils/mqtt/MqttProducer.java
@@ -1,21 +1,19 @@
package com.ard.utils.mqtt;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
-import org.springframework.expression.spel.ast.NullLiteral;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
/**
- * @Description: mqtt娑堣垂瀹㈡埛绔�
- * @ClassName: MqttConsumer
+ * @Description: mqtt鐢熶骇瀹㈡埛绔�
+ * @ClassName: MqttProducer
* @Author: 鍒樿嫃涔�
* @Date: 2023骞�05鏈�29鏃�9:55
* @Version: 1.0
@@ -23,11 +21,9 @@
@Component
@Slf4j(topic = "mqtt")
@Order(1)
-public class MqttConsumer implements ApplicationRunner {
+public class MqttProducer implements ApplicationRunner {
@Value("${spring.mqtt.enabled}")
private Boolean MQTT_ENABLED;
- @Value("${spring.mqtt.topic}")
- private String MQTT_TOPIC;
@Value("${spring.mqtt.host}")
private String MQTT_HOST;
@Value("${spring.mqtt.clientId}")
@@ -60,11 +56,8 @@
getClient();
// 2 璁剧疆閰嶇疆
MqttConnectOptions options = getOptions();
- String[] topic = MQTT_TOPIC.split(",");
- // 3 娑堟伅鍙戝竷璐ㄩ噺
- int[] qos = getQos(topic.length);
- // 4 鏈�鍚庤缃�
- create(options, topic, qos);
+ // 3 鏈�鍚庤缃�
+ create(options);
} catch (Exception e) {
log.error("mqtt杩炴帴寮傚父锛�" + e);
}
@@ -97,69 +90,37 @@
// 璁剧疆浼氳瘽蹇冭烦鏃堕棿
options.setKeepAliveInterval(MQTT_KEEP_ALIVE);
// 鏄惁娓呴櫎session
- options.setCleanSession(true);
+ options.setCleanSession(false);
log.debug("--鐢熸垚mqtt閰嶇疆瀵硅薄");
return options;
}
- /**
- * qos --- 3 ---
- */
- public int[] getQos(int length) {
-
- int[] qos = new int[length];
- for (int i = 0; i < length; i++) {
- /**
- * MQTT鍗忚涓湁涓夌娑堟伅鍙戝竷鏈嶅姟璐ㄩ噺:
- *
- * QOS0锛� 鈥滆嚦澶氫竴娆♀�濓紝娑堟伅鍙戝竷瀹屽叏渚濊禆搴曞眰 TCP/IP 缃戠粶銆備細鍙戠敓娑堟伅涓㈠け鎴栭噸澶嶃�傝繖涓�绾у埆鍙敤浜庡涓嬫儏鍐碉紝鐜浼犳劅鍣ㄦ暟鎹紝涓㈠け涓�娆¤璁板綍鏃犳墍璋擄紝鍥犱负涓嶄箙鍚庤繕浼氭湁绗簩娆″彂閫併��
- * QOS1锛� 鈥滆嚦灏戜竴娆♀�濓紝纭繚娑堟伅鍒拌揪锛屼絾娑堟伅閲嶅鍙兘浼氬彂鐢熴��
- * QOS2锛� 鈥滃彧鏈変竴娆♀�濓紝纭繚娑堟伅鍒拌揪涓�娆°�傝繖涓�绾у埆鍙敤浜庡涓嬫儏鍐碉紝鍦ㄨ璐圭郴缁熶腑锛屾秷鎭噸澶嶆垨涓㈠け浼氬鑷翠笉姝g‘鐨勭粨鏋滐紝璧勬簮寮�閿�澶�
- */
- qos[i] = 1;
- }
- log.debug("--璁剧疆娑堟伅鍙戝竷璐ㄩ噺");
- return qos;
- }
/**
- * 瑁呰浇鍚勭瀹炰緥鍜岃闃呬富棰� --- 4 ---
+ * 杩炴帴骞惰杞藉洖璋� --- 3 ---
*/
- public void create(MqttConnectOptions options, String[] topic, int[] qos) {
+ public void create(MqttConnectOptions options) {
try {
- client.setCallback(new MqttConsumerCallback(client, options, topic, qos));
+ client.setCallback(new MqttProducerCallback(client, options));
log.debug("--娣诲姞鍥炶皟澶勭悊绫�");
client.connect(options);
} catch (Exception e) {
- log.info("瑁呰浇瀹炰緥鎴栬闃呬富棰樺紓甯革細" + e);
+ log.info("杩炴帴骞惰杞藉洖璋冨紓甯革細" + e);
}
}
- /**
- * 璁㈤槄鏌愪釜涓婚
- *
- * @param topic
- * @param qos
- */
- public void subscribe(String topic, int qos) {
- try {
- log.debug("topic:" + topic);
- client.subscribe(topic, qos);
- } catch (MqttException e) {
- e.printStackTrace();
- }
- }
+
/**
* 鍙戝竷锛岄潪鎸佷箙鍖�
* <p>
- * qos鏍规嵁鏂囨。璁剧疆涓�1
+ * qos鏍规嵁鏂囨。璁剧疆涓�2
*
* @param topic
* @param msg
*/
public static void publish(String topic, String msg) {
- publish(1, false, topic, msg);
+ publish(2, false, topic, msg);
}
/**
--
Gitblit v1.9.3