From 157c26f5188c7ed62a4547f7e3b5a5a3e3ed7729 Mon Sep 17 00:00:00 2001 From: ‘liusuyi’ <1951119284@qq.com> Date: 星期日, 08 十月 2023 09:40:47 +0800 Subject: [PATCH] 优化mqtt生产者取消消费者订阅 --- src/main/java/com/ard/utils/mqtt/MqttProducer.java | 65 ++++++-------------------------- 1 files changed, 13 insertions(+), 52 deletions(-) diff --git a/src/main/java/com/ard/utils/mqtt/MqttConsumer.java b/src/main/java/com/ard/utils/mqtt/MqttProducer.java similarity index 66% rename from src/main/java/com/ard/utils/mqtt/MqttConsumer.java rename to src/main/java/com/ard/utils/mqtt/MqttProducer.java index fc03b39..aef7240 100644 --- a/src/main/java/com/ard/utils/mqtt/MqttConsumer.java +++ b/src/main/java/com/ard/utils/mqtt/MqttProducer.java @@ -1,21 +1,19 @@ package com.ard.utils.mqtt; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.annotation.Order; -import org.springframework.expression.spel.ast.NullLiteral; import org.springframework.stereotype.Component; import java.io.UnsupportedEncodingException; /** - * @Description: mqtt娑堣垂瀹㈡埛绔� - * @ClassName: MqttConsumer + * @Description: mqtt鐢熶骇瀹㈡埛绔� + * @ClassName: MqttProducer * @Author: 鍒樿嫃涔� * @Date: 2023骞�05鏈�29鏃�9:55 * @Version: 1.0 @@ -23,11 +21,9 @@ @Component @Slf4j(topic = "mqtt") @Order(1) -public class MqttConsumer implements ApplicationRunner { +public class MqttProducer implements ApplicationRunner { @Value("${spring.mqtt.enabled}") private Boolean MQTT_ENABLED; - @Value("${spring.mqtt.topic}") - private String MQTT_TOPIC; @Value("${spring.mqtt.host}") private String MQTT_HOST; @Value("${spring.mqtt.clientId}") @@ -60,11 +56,8 @@ getClient(); // 2 璁剧疆閰嶇疆 MqttConnectOptions options = getOptions(); - String[] topic = MQTT_TOPIC.split(","); - // 3 娑堟伅鍙戝竷璐ㄩ噺 - int[] qos = getQos(topic.length); - // 4 鏈�鍚庤缃� - create(options, topic, qos); + // 3 鏈�鍚庤缃� + create(options); } catch (Exception e) { log.error("mqtt杩炴帴寮傚父锛�" + e); } @@ -97,69 +90,37 @@ // 璁剧疆浼氳瘽蹇冭烦鏃堕棿 options.setKeepAliveInterval(MQTT_KEEP_ALIVE); // 鏄惁娓呴櫎session - options.setCleanSession(true); + options.setCleanSession(false); log.debug("--鐢熸垚mqtt閰嶇疆瀵硅薄"); return options; } - /** - * qos --- 3 --- - */ - public int[] getQos(int length) { - - int[] qos = new int[length]; - for (int i = 0; i < length; i++) { - /** - * MQTT鍗忚涓湁涓夌娑堟伅鍙戝竷鏈嶅姟璐ㄩ噺: - * - * QOS0锛� 鈥滆嚦澶氫竴娆♀�濓紝娑堟伅鍙戝竷瀹屽叏渚濊禆搴曞眰 TCP/IP 缃戠粶銆備細鍙戠敓娑堟伅涓㈠け鎴栭噸澶嶃�傝繖涓�绾у埆鍙敤浜庡涓嬫儏鍐碉紝鐜浼犳劅鍣ㄦ暟鎹紝涓㈠け涓�娆¤璁板綍鏃犳墍璋擄紝鍥犱负涓嶄箙鍚庤繕浼氭湁绗簩娆″彂閫併�� - * QOS1锛� 鈥滆嚦灏戜竴娆♀�濓紝纭繚娑堟伅鍒拌揪锛屼絾娑堟伅閲嶅鍙兘浼氬彂鐢熴�� - * QOS2锛� 鈥滃彧鏈変竴娆♀�濓紝纭繚娑堟伅鍒拌揪涓�娆°�傝繖涓�绾у埆鍙敤浜庡涓嬫儏鍐碉紝鍦ㄨ璐圭郴缁熶腑锛屾秷鎭噸澶嶆垨涓㈠け浼氬鑷翠笉姝g‘鐨勭粨鏋滐紝璧勬簮寮�閿�澶� - */ - qos[i] = 1; - } - log.debug("--璁剧疆娑堟伅鍙戝竷璐ㄩ噺"); - return qos; - } /** - * 瑁呰浇鍚勭瀹炰緥鍜岃闃呬富棰� --- 4 --- + * 杩炴帴骞惰杞藉洖璋� --- 3 --- */ - public void create(MqttConnectOptions options, String[] topic, int[] qos) { + public void create(MqttConnectOptions options) { try { - client.setCallback(new MqttConsumerCallback(client, options, topic, qos)); + client.setCallback(new MqttProducerCallback(client, options)); log.debug("--娣诲姞鍥炶皟澶勭悊绫�"); client.connect(options); } catch (Exception e) { - log.info("瑁呰浇瀹炰緥鎴栬闃呬富棰樺紓甯革細" + e); + log.info("杩炴帴骞惰杞藉洖璋冨紓甯革細" + e); } } - /** - * 璁㈤槄鏌愪釜涓婚 - * - * @param topic - * @param qos - */ - public void subscribe(String topic, int qos) { - try { - log.debug("topic:" + topic); - client.subscribe(topic, qos); - } catch (MqttException e) { - e.printStackTrace(); - } - } + /** * 鍙戝竷锛岄潪鎸佷箙鍖� * <p> - * qos鏍规嵁鏂囨。璁剧疆涓�1 + * qos鏍规嵁鏂囨。璁剧疆涓�2 * * @param topic * @param msg */ public static void publish(String topic, String msg) { - publish(1, false, topic, msg); + publish(2, false, topic, msg); } /** -- Gitblit v1.9.3