From 9aa66699aaa610be66f5bd0c22e90cae114311f9 Mon Sep 17 00:00:00 2001
From: ‘liusuyi’ <1951119284@qq.com>
Date: 星期五, 07 七月 2023 17:15:16 +0800
Subject: [PATCH] 优化外联设备报警解析并上传mqtt
---
src/main/java/com/ard/utils/mqtt/MqttConsumer.java | 69 +++++++++++++++++++---------------
1 files changed, 38 insertions(+), 31 deletions(-)
diff --git a/src/main/java/com/ard/utils/mqtt/MqttConsumer.java b/src/main/java/com/ard/utils/mqtt/MqttConsumer.java
index ea2b31a..55af6ec 100644
--- a/src/main/java/com/ard/utils/mqtt/MqttConsumer.java
+++ b/src/main/java/com/ard/utils/mqtt/MqttConsumer.java
@@ -1,12 +1,17 @@
package com.ard.utils.mqtt;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
+import org.springframework.core.annotation.Order;
+import org.springframework.expression.spel.ast.NullLiteral;
import org.springframework.stereotype.Component;
+
import java.io.UnsupportedEncodingException;
+
/**
* @Description: mqtt娑堣垂瀹㈡埛绔�
* @ClassName: MqttConsumer
@@ -16,6 +21,7 @@
**/
@Component
@Slf4j(topic = "mqtt")
+@Order(1)
public class MqttConsumer implements ApplicationRunner {
private static MqttClient client;
@@ -23,8 +29,7 @@
@Override
public void run(ApplicationArguments args) {
log.info("鍒濆鍖栧苟鍚姩mqtt......");
- if(PropertiesUtil.MQTT_ENABLED)
- {
+ if (PropertiesUtil.MQTT_ENABLED) {
this.connect();
}
}
@@ -49,7 +54,7 @@
}
/**
- * 鍒涘缓瀹㈡埛绔� --- 1 ---
+ * 鍒涘缓瀹㈡埛绔� --- 1 ---
*/
public void getClient() {
try {
@@ -63,7 +68,7 @@
}
/**
- * 鐢熸垚閰嶇疆瀵硅薄锛岀敤鎴峰悕锛屽瘑鐮佺瓑 --- 2 ---
+ * 鐢熸垚閰嶇疆瀵硅薄锛岀敤鎴峰悕锛屽瘑鐮佺瓑 --- 2 ---
*/
public MqttConnectOptions getOptions() {
MqttConnectOptions options = new MqttConnectOptions();
@@ -81,7 +86,7 @@
}
/**
- * qos --- 3 ---
+ * qos --- 3 ---
*/
public int[] getQos(int length) {
@@ -101,7 +106,7 @@
}
/**
- * 瑁呰浇鍚勭瀹炰緥鍜岃闃呬富棰� --- 4 ---
+ * 瑁呰浇鍚勭瀹炰緥鍜岃闃呬富棰� --- 4 ---
*/
public void create(MqttConnectOptions options, String[] topic, int[] qos) {
try {
@@ -130,8 +135,8 @@
/**
* 鍙戝竷锛岄潪鎸佷箙鍖�
- *
- * qos鏍规嵁鏂囨。璁剧疆涓�1
+ * <p>
+ * qos鏍规嵁鏂囨。璁剧疆涓�1
*
* @param topic
* @param msg
@@ -144,30 +149,32 @@
* 鍙戝竷
*/
public static void publish(int qos, boolean retained, String topic, String pushMessage) {
- log.info("銆愪富棰樸��:" + topic + "銆恞os銆�:" + qos + "銆恜ushMessage銆�:" + pushMessage);
- MqttMessage message = new MqttMessage();
- message.setQos(qos);
- message.setRetained(retained);
- try {
- message.setPayload(pushMessage.getBytes("UTF-8"));
- } catch (UnsupportedEncodingException e) {
- log.error("mqtt缂栫爜寮傚父锛�" + e.getMessage());
- }
- MqttTopic mTopic = client.getTopic(topic);
- if (null == mTopic) {
- log.error("topic锛�" + topic + " 涓嶅瓨鍦�");
- }
- MqttDeliveryToken token;
- try {
- token = mTopic.publish(message);
- token.waitForCompletion();
- if (token.isComplete()) {
- log.info("娑堟伅鍙戦�佹垚鍔�");
+ if (client != null) {
+ log.info("銆愪富棰樸��:" + topic + "銆恞os銆�:" + qos + "銆恜ushMessage銆�:" + pushMessage);
+ MqttMessage message = new MqttMessage();
+ message.setQos(qos);
+ message.setRetained(retained);
+ try {
+ message.setPayload(pushMessage.getBytes("UTF-8"));
+ } catch (UnsupportedEncodingException e) {
+ log.error("mqtt缂栫爜寮傚父锛�" + e.getMessage());
}
- } catch (MqttPersistenceException e) {
- e.printStackTrace();
- } catch (MqttException e) {
- e.printStackTrace();
+ MqttTopic mTopic = client.getTopic(topic);
+ if (null == mTopic) {
+ log.error("topic锛�" + topic + " 涓嶅瓨鍦�");
+ }
+ MqttDeliveryToken token;
+ try {
+ token = mTopic.publish(message);
+ token.waitForCompletion();
+ if (token.isComplete()) {
+ log.info("娑堟伅鍙戦�佹垚鍔�");
+ }
+ } catch (MqttPersistenceException e) {
+ log.error("mqtt鎸佷箙寮傚父锛�" + e.getMessage());
+ } catch (MqttException e) {
+ log.error("mqtt寮傚父锛�" + e.getMessage());
+ }
}
}
}
--
Gitblit v1.9.3