From ad89fdba09b8f0596ed41c5b31de6f34fe1ce620 Mon Sep 17 00:00:00 2001
From: ‘liusuyi’ <1951119284@qq.com>
Date: 星期四, 10 八月 2023 17:20:41 +0800
Subject: [PATCH] 优化行为分析信息报警
---
src/main/java/com/ard/utils/mqtt/MqttConsumer.java | 83 ++++++++++++++++++++++-------------------
1 files changed, 45 insertions(+), 38 deletions(-)
diff --git a/src/main/java/com/ard/utils/mqtt/MqttConsumer.java b/src/main/java/com/ard/utils/mqtt/MqttConsumer.java
index 1c07850..6c80d29 100644
--- a/src/main/java/com/ard/utils/mqtt/MqttConsumer.java
+++ b/src/main/java/com/ard/utils/mqtt/MqttConsumer.java
@@ -1,12 +1,17 @@
package com.ard.utils.mqtt;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
+import org.springframework.core.annotation.Order;
+import org.springframework.expression.spel.ast.NullLiteral;
import org.springframework.stereotype.Component;
+
import java.io.UnsupportedEncodingException;
+
/**
* @Description: mqtt娑堣垂瀹㈡埛绔�
* @ClassName: MqttConsumer
@@ -16,15 +21,15 @@
**/
@Component
@Slf4j(topic = "mqtt")
+@Order(1)
public class MqttConsumer implements ApplicationRunner {
private static MqttClient client;
@Override
public void run(ApplicationArguments args) {
- log.info("鍒濆鍖栧苟鍚姩mqtt......");
- if(PropertiesUtil.MQTT_ENABLED)
- {
+ log.debug("鍒濆鍖栧苟鍚姩mqtt......");
+ if (PropertiesUtil.MQTT_ENABLED) {
this.connect();
}
}
@@ -49,21 +54,21 @@
}
/**
- * 鍒涘缓瀹㈡埛绔� --- 1 ---
+ * 鍒涘缓瀹㈡埛绔� --- 1 ---
*/
public void getClient() {
try {
if (null == client) {
client = new MqttClient(PropertiesUtil.MQTT_HOST, PropertiesUtil.MQTT_CLIENT_ID, new MemoryPersistence());
}
- log.info("--鍒涘缓mqtt瀹㈡埛绔�");
+ log.debug("--鍒涘缓mqtt瀹㈡埛绔�");
} catch (Exception e) {
log.error("鍒涘缓mqtt瀹㈡埛绔紓甯革細" + e);
}
}
/**
- * 鐢熸垚閰嶇疆瀵硅薄锛岀敤鎴峰悕锛屽瘑鐮佺瓑 --- 2 ---
+ * 鐢熸垚閰嶇疆瀵硅薄锛岀敤鎴峰悕锛屽瘑鐮佺瓑 --- 2 ---
*/
public MqttConnectOptions getOptions() {
MqttConnectOptions options = new MqttConnectOptions();
@@ -75,13 +80,13 @@
// 璁剧疆浼氳瘽蹇冭烦鏃堕棿
options.setKeepAliveInterval(PropertiesUtil.MQTT_KEEP_ALIVE);
// 鏄惁娓呴櫎session
- options.setCleanSession(false);
- log.info("--鐢熸垚mqtt閰嶇疆瀵硅薄");
+ options.setCleanSession(true);
+ log.debug("--鐢熸垚mqtt閰嶇疆瀵硅薄");
return options;
}
/**
- * qos --- 3 ---
+ * qos --- 3 ---
*/
public int[] getQos(int length) {
@@ -96,17 +101,17 @@
*/
qos[i] = 1;
}
- log.info("--璁剧疆娑堟伅鍙戝竷璐ㄩ噺");
+ log.debug("--璁剧疆娑堟伅鍙戝竷璐ㄩ噺");
return qos;
}
/**
- * 瑁呰浇鍚勭瀹炰緥鍜岃闃呬富棰� --- 4 ---
+ * 瑁呰浇鍚勭瀹炰緥鍜岃闃呬富棰� --- 4 ---
*/
public void create(MqttConnectOptions options, String[] topic, int[] qos) {
try {
client.setCallback(new MqttConsumerCallback(client, options, topic, qos));
- log.info("--娣诲姞鍥炶皟澶勭悊绫�");
+ log.debug("--娣诲姞鍥炶皟澶勭悊绫�");
client.connect(options);
} catch (Exception e) {
log.info("瑁呰浇瀹炰緥鎴栬闃呬富棰樺紓甯革細" + e);
@@ -121,7 +126,7 @@
*/
public void subscribe(String topic, int qos) {
try {
- log.info("topic:" + topic);
+ log.debug("topic:" + topic);
client.subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
@@ -130,8 +135,8 @@
/**
* 鍙戝竷锛岄潪鎸佷箙鍖�
- *
- * qos鏍规嵁鏂囨。璁剧疆涓�1
+ * <p>
+ * qos鏍规嵁鏂囨。璁剧疆涓�1
*
* @param topic
* @param msg
@@ -144,30 +149,32 @@
* 鍙戝竷
*/
public static void publish(int qos, boolean retained, String topic, String pushMessage) {
- log.info("銆愪富棰樸��:" + topic + "銆恞os銆�:" + qos + "銆恜ushMessage銆�:" + pushMessage);
- MqttMessage message = new MqttMessage();
- message.setQos(qos);
- message.setRetained(retained);
- try {
- message.setPayload(pushMessage.getBytes("UTF-8"));
- } catch (UnsupportedEncodingException e) {
- log.error("mqtt缂栫爜寮傚父锛�" + e.getMessage());
- }
- MqttTopic mTopic = client.getTopic(topic);
- if (null == mTopic) {
- log.error("topic锛�" + topic + " 涓嶅瓨鍦�");
- }
- MqttDeliveryToken token;
- try {
- token = mTopic.publish(message);
- token.waitForCompletion();
- if (token.isComplete()) {
- log.info("娑堟伅鍙戦�佹垚鍔�");
+ if (client != null) {
+ log.debug("銆愪富棰樸��:" + topic + "銆恞os銆�:" + qos + "銆恜ushMessage銆�:" + pushMessage);
+ MqttMessage message = new MqttMessage();
+ message.setQos(qos);
+ message.setRetained(retained);
+ try {
+ message.setPayload(pushMessage.getBytes("UTF-8"));
+ } catch (UnsupportedEncodingException e) {
+ log.error("mqtt缂栫爜寮傚父锛�" + e.getMessage());
}
- } catch (MqttPersistenceException e) {
- e.printStackTrace();
- } catch (MqttException e) {
- e.printStackTrace();
+ MqttTopic mTopic = client.getTopic(topic);
+ if (null == mTopic) {
+ log.error("topic锛�" + topic + " 涓嶅瓨鍦�");
+ }
+ MqttDeliveryToken token;
+ try {
+ token = mTopic.publish(message);
+ token.waitForCompletion();
+ if (token.isComplete()) {
+ log.debug("娑堟伅鍙戦�佹垚鍔�");
+ }
+ } catch (MqttPersistenceException e) {
+ log.error("mqtt鎸佷箙寮傚父锛�" + e.getMessage());
+ } catch (MqttException e) {
+ log.error("mqtt寮傚父锛�" + e.getMessage());
+ }
}
}
}
--
Gitblit v1.9.3