From 38f29e38fcc668171dc05c53d40a36b895c86102 Mon Sep 17 00:00:00 2001 From: liusuyi <1951119284@qq.com> Date: 星期四, 10 十月 2024 13:34:28 +0800 Subject: [PATCH] init --- ard-work/src/main/java/com/ruoyi/utils/mqtt/MqttConsumer.java | 55 ++++++++++++++++++++++++++++++++++++++----------------- 1 files changed, 38 insertions(+), 17 deletions(-) diff --git a/ard-work/src/main/java/com/ruoyi/utils/mqtt/MqttConsumer.java b/ard-work/src/main/java/com/ruoyi/utils/mqtt/MqttConsumer.java index 3c36909..35b6dad 100644 --- a/ard-work/src/main/java/com/ruoyi/utils/mqtt/MqttConsumer.java +++ b/ard-work/src/main/java/com/ruoyi/utils/mqtt/MqttConsumer.java @@ -3,8 +3,10 @@ import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; +import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import java.io.UnsupportedEncodingException; /** @@ -16,14 +18,33 @@ **/ @Component @Slf4j(topic = "mqtt") +@Order(1) public class MqttConsumer implements ApplicationRunner { + + @Value("${mqtt.enabled}") + private Boolean MQTT_ENABLED; + @Value("${mqtt.topic}") + private String MQTT_TOPIC; + @Value("${mqtt.host}") + private String MQTT_HOST; + @Value("${mqtt.clientId}") + private String MQTT_CLIENT_ID; + @Value("${mqtt.username}") + private String MQTT_USER_NAME; + @Value("${mqtt.password}") + private String MQTT_PASSWORD; + @Value("${mqtt.timeout}") + private int MQTT_TIMEOUT; + @Value("${mqtt.keepalive}") + private int MQTT_KEEP_ALIVE; private static MqttClient client; + @Override public void run(ApplicationArguments args) { - log.info("鍒濆鍖栧苟鍚姩mqtt......"); - if(PropertiesUtil.MQTT_ENABLED) + log.debug("鍒濆鍖栧苟鍚姩mqtt......"); + if(MQTT_ENABLED) { this.connect(); } @@ -38,7 +59,7 @@ getClient(); // 2 璁剧疆閰嶇疆 MqttConnectOptions options = getOptions(); - String[] topic = PropertiesUtil.MQTT_TOPIC.split(","); + String[] topic = MQTT_TOPIC.split(","); // 3 娑堟伅鍙戝竷璐ㄩ噺 int[] qos = getQos(topic.length); // 4 鏈�鍚庤缃� @@ -54,9 +75,9 @@ public void getClient() { try { if (null == client) { - client = new MqttClient(PropertiesUtil.MQTT_HOST, PropertiesUtil.MQTT_CLIENT_ID, new MemoryPersistence()); + client = new MqttClient(MQTT_HOST, MQTT_CLIENT_ID, new MemoryPersistence()); } - log.info("--鍒涘缓mqtt瀹㈡埛绔�"); + log.debug("--鍒涘缓mqtt瀹㈡埛绔�"); } catch (Exception e) { log.error("鍒涘缓mqtt瀹㈡埛绔紓甯革細" + e); } @@ -68,15 +89,15 @@ public MqttConnectOptions getOptions() { MqttConnectOptions options = new MqttConnectOptions(); //璁剧疆鐢ㄦ埛鍚嶅瘑鐮� - options.setUserName(PropertiesUtil.MQTT_USER_NAME); - options.setPassword(PropertiesUtil.MQTT_PASSWORD.toCharArray()); + options.setUserName(MQTT_USER_NAME); + options.setPassword(MQTT_PASSWORD.toCharArray()); // 璁剧疆瓒呮椂鏃堕棿 - options.setConnectionTimeout(PropertiesUtil.MQTT_TIMEOUT); + options.setConnectionTimeout(MQTT_TIMEOUT); // 璁剧疆浼氳瘽蹇冭烦鏃堕棿 - options.setKeepAliveInterval(PropertiesUtil.MQTT_KEEP_ALIVE); + options.setKeepAliveInterval(MQTT_KEEP_ALIVE); // 鏄惁娓呴櫎session - options.setCleanSession(false); - log.info("--鐢熸垚mqtt閰嶇疆瀵硅薄"); + options.setCleanSession(true); + log.debug("--鐢熸垚mqtt閰嶇疆瀵硅薄"); return options; } @@ -96,7 +117,7 @@ */ qos[i] = 1; } - log.info("--璁剧疆娑堟伅鍙戝竷璐ㄩ噺"); + log.debug("--璁剧疆娑堟伅鍙戝竷璐ㄩ噺"); return qos; } @@ -106,10 +127,10 @@ public void create(MqttConnectOptions options, String[] topic, int[] qos) { try { client.setCallback(new MqttConsumerCallback(client, options, topic, qos)); - log.info("--娣诲姞鍥炶皟澶勭悊绫�"); + log.debug("--娣诲姞鍥炶皟澶勭悊绫�"); client.connect(options); } catch (Exception e) { - log.info("瑁呰浇瀹炰緥鎴栬闃呬富棰樺紓甯革細" + e); + log.error("瑁呰浇瀹炰緥鎴栬闃呬富棰樺紓甯革細" + e); } } @@ -121,7 +142,7 @@ */ public void subscribe(String topic, int qos) { try { - log.info("topic:" + topic); + log.debug("topic:" + topic); client.subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); @@ -144,7 +165,7 @@ * 鍙戝竷 */ public static void publish(int qos, boolean retained, String topic, String pushMessage) { - log.info("銆愪富棰樸��:" + topic + "銆恞os銆�:" + qos + "銆恜ushMessage銆�:" + pushMessage); + log.debug("銆愪富棰樸��:" + topic + "銆恞os銆�:" + qos + "銆恜ushMessage銆�:" + pushMessage); MqttMessage message = new MqttMessage(); message.setQos(qos); message.setRetained(retained); @@ -162,7 +183,7 @@ token = mTopic.publish(message); token.waitForCompletion(); if (token.isComplete()) { - log.info("娑堟伅鍙戦�佹垚鍔�"); + log.debug("娑堟伅鍙戦�佹垚鍔�"); } } catch (MqttPersistenceException e) { e.printStackTrace(); -- Gitblit v1.9.3