From 157c26f5188c7ed62a4547f7e3b5a5a3e3ed7729 Mon Sep 17 00:00:00 2001
From: ‘liusuyi’ <1951119284@qq.com>
Date: 星期日, 08 十月 2023 09:40:47 +0800
Subject: [PATCH] 优化mqtt生产者取消消费者订阅
---
src/main/java/com/ard/alarm/apponekey/service/impl/ArdAlarmApponekeyServiceImpl.java | 5 -
src/main/java/com/ard/utils/tcp/ClientHandler.java | 6 +-
src/main/java/com/ard/alarm/stealelec/service/StealElecAlarmService.java | 6 +-
src/main/java/com/ard/utils/hiksdk/service/impl/FMSGCallBack.java | 8 +-
src/main/java/com/ard/utils/mqtt/MqttProducerCallback.java | 25 +++-----
src/main/java/com/ard/alarm/digitization/service/impl/DataBridgeServiceImpl.java | 4
src/main/resources/application-dev.yml | 3
src/main/java/com/ard/utils/mqtt/MqttProducer.java | 65 ++++-----------------
src/main/java/com/ard/alarm/tube/service/TubeAlarmService.java | 4
9 files changed, 39 insertions(+), 87 deletions(-)
diff --git a/src/main/java/com/ard/alarm/apponekey/service/impl/ArdAlarmApponekeyServiceImpl.java b/src/main/java/com/ard/alarm/apponekey/service/impl/ArdAlarmApponekeyServiceImpl.java
index f002e47..dba5f71 100644
--- a/src/main/java/com/ard/alarm/apponekey/service/impl/ArdAlarmApponekeyServiceImpl.java
+++ b/src/main/java/com/ard/alarm/apponekey/service/impl/ArdAlarmApponekeyServiceImpl.java
@@ -1,10 +1,9 @@
package com.ard.alarm.apponekey.service.impl;
import java.util.Date;
-import java.util.List;
import com.alibaba.fastjson2.JSON;
-import com.ard.utils.mqtt.MqttConsumer;
+import com.ard.utils.mqtt.MqttProducer;
import com.ard.utils.uuid.IdUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@@ -34,7 +33,7 @@
ardAlarmApponekey.setId(IdUtils.fastSimpleUUID());
ardAlarmApponekey.setCreateTime(new Date());
printLog(ardAlarmApponekey);
- MqttConsumer.publish(2, false, "apponekey", JSON.toJSONString(ardAlarmApponekey));
+ MqttProducer.publish(2, false, "apponekey", JSON.toJSONString(ardAlarmApponekey));
return 1;
} catch (Exception ex) {
log.error("涓�閿姤璀﹀鐞嗗紓甯�:" + ex.getMessage());
diff --git a/src/main/java/com/ard/alarm/digitization/service/impl/DataBridgeServiceImpl.java b/src/main/java/com/ard/alarm/digitization/service/impl/DataBridgeServiceImpl.java
index 981f2a8..8b04e6f 100644
--- a/src/main/java/com/ard/alarm/digitization/service/impl/DataBridgeServiceImpl.java
+++ b/src/main/java/com/ard/alarm/digitization/service/impl/DataBridgeServiceImpl.java
@@ -4,7 +4,7 @@
import com.ard.alarm.digitization.model.DataBridge;
import com.ard.alarm.digitization.service.DataBridgeService;
import com.ard.utils.jdbc.Query;
-import com.ard.utils.mqtt.MqttConsumer;
+import com.ard.utils.mqtt.MqttProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
@@ -52,7 +52,7 @@
data.setWellNo(elem[0]);
data.setAlarmType(elem[1]);
data.setAlarmTime(elem[2]);
- MqttConsumer.publish(2, false, "digitization3", JSON.toJSONString(data));
+ MqttProducer.publish(2, false, "digitization3", JSON.toJSONString(data));
}
}
} catch (Exception ex) {
diff --git a/src/main/java/com/ard/alarm/stealelec/service/StealElecAlarmService.java b/src/main/java/com/ard/alarm/stealelec/service/StealElecAlarmService.java
index ced8991..2714637 100644
--- a/src/main/java/com/ard/alarm/stealelec/service/StealElecAlarmService.java
+++ b/src/main/java/com/ard/alarm/stealelec/service/StealElecAlarmService.java
@@ -4,13 +4,13 @@
import com.alibaba.fastjson2.JSONObject;
import com.ard.alarm.stealelec.domain.ArdAlarmStealelec;
import com.ard.utils.http.HttpUtils;
-import com.ard.utils.mqtt.MqttConsumer;
+import com.ard.utils.mqtt.MqttProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
-import javax.annotation.Resource;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -68,7 +68,7 @@
if (tempList.contains(wd.getId())) {
continue;
}
- MqttConsumer.publish(2, false, "stealelec", JSON.toJSONString(wd));
+ MqttProducer.publish(2, false, "stealelec", JSON.toJSONString(wd));
tempList.add(wd.getId());
}
}
diff --git a/src/main/java/com/ard/alarm/tube/service/TubeAlarmService.java b/src/main/java/com/ard/alarm/tube/service/TubeAlarmService.java
index 06584e6..f15cb3f 100644
--- a/src/main/java/com/ard/alarm/tube/service/TubeAlarmService.java
+++ b/src/main/java/com/ard/alarm/tube/service/TubeAlarmService.java
@@ -2,7 +2,7 @@
import com.alibaba.fastjson2.JSON;
import com.ard.utils.other.DateUtils;
-import com.ard.utils.mqtt.MqttConsumer;
+import com.ard.utils.mqtt.MqttProducer;
import com.ard.utils.udp.NettyUdpServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
@@ -73,7 +73,7 @@
log.debug("浣嶇疆锛�" + position);
log.debug("鍊肩彮浜猴細" + watcher);
log.debug("缁撴潫瑙f瀽" + stop);
- MqttConsumer.publish(2, false, "tube", JSON.toJSONString(map));
+ MqttProducer.publish(2, false, "tube", JSON.toJSONString(map));
} else {
log.error("鏁版嵁寮傚父");
}
diff --git a/src/main/java/com/ard/utils/hiksdk/service/impl/FMSGCallBack.java b/src/main/java/com/ard/utils/hiksdk/service/impl/FMSGCallBack.java
index 62f68de..7deea3e 100644
--- a/src/main/java/com/ard/utils/hiksdk/service/impl/FMSGCallBack.java
+++ b/src/main/java/com/ard/utils/hiksdk/service/impl/FMSGCallBack.java
@@ -14,7 +14,7 @@
import com.ard.utils.hiksdk.domain.ExternalAlarmEventInfo;
import com.ard.utils.hiksdk.util.hikSdkUtil.HCNetSDK;
import com.ard.utils.other.DateUtils;
-import com.ard.utils.mqtt.MqttConsumer;
+import com.ard.utils.mqtt.MqttProducer;
import com.ard.utils.spring.SpringUtils;
import com.ard.utils.uuid.IdUtils;
import com.sun.jna.Pointer;
@@ -440,7 +440,7 @@
*/
private void publishMqtt(CameraEventInfo info) {
printLog(info);
- MqttConsumer.publish(2, false, "camera", JSON.toJSONString(info));
+ MqttProducer.publish(2, false, "camera", JSON.toJSONString(info));
}
/**
@@ -448,7 +448,7 @@
*/
private void publishMqtt(ExternalAlarmEventInfo info) {
printLog(info);
- MqttConsumer.publish(2, false, "external", JSON.toJSONString(info));
+ MqttProducer.publish(2, false, "external", JSON.toJSONString(info));
}
/**
@@ -456,7 +456,7 @@
*/
private void publishMqtt(AccessControlHostEventInfo info) {
printLog(info);
- MqttConsumer.publish(2, false, "accessControl", JSON.toJSONString(info));
+ MqttProducer.publish(2, false, "accessControl", JSON.toJSONString(info));
}
/**
diff --git a/src/main/java/com/ard/utils/mqtt/MqttConsumer.java b/src/main/java/com/ard/utils/mqtt/MqttProducer.java
similarity index 66%
rename from src/main/java/com/ard/utils/mqtt/MqttConsumer.java
rename to src/main/java/com/ard/utils/mqtt/MqttProducer.java
index fc03b39..aef7240 100644
--- a/src/main/java/com/ard/utils/mqtt/MqttConsumer.java
+++ b/src/main/java/com/ard/utils/mqtt/MqttProducer.java
@@ -1,21 +1,19 @@
package com.ard.utils.mqtt;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
-import org.springframework.expression.spel.ast.NullLiteral;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
/**
- * @Description: mqtt娑堣垂瀹㈡埛绔�
- * @ClassName: MqttConsumer
+ * @Description: mqtt鐢熶骇瀹㈡埛绔�
+ * @ClassName: MqttProducer
* @Author: 鍒樿嫃涔�
* @Date: 2023骞�05鏈�29鏃�9:55
* @Version: 1.0
@@ -23,11 +21,9 @@
@Component
@Slf4j(topic = "mqtt")
@Order(1)
-public class MqttConsumer implements ApplicationRunner {
+public class MqttProducer implements ApplicationRunner {
@Value("${spring.mqtt.enabled}")
private Boolean MQTT_ENABLED;
- @Value("${spring.mqtt.topic}")
- private String MQTT_TOPIC;
@Value("${spring.mqtt.host}")
private String MQTT_HOST;
@Value("${spring.mqtt.clientId}")
@@ -60,11 +56,8 @@
getClient();
// 2 璁剧疆閰嶇疆
MqttConnectOptions options = getOptions();
- String[] topic = MQTT_TOPIC.split(",");
- // 3 娑堟伅鍙戝竷璐ㄩ噺
- int[] qos = getQos(topic.length);
- // 4 鏈�鍚庤缃�
- create(options, topic, qos);
+ // 3 鏈�鍚庤缃�
+ create(options);
} catch (Exception e) {
log.error("mqtt杩炴帴寮傚父锛�" + e);
}
@@ -97,69 +90,37 @@
// 璁剧疆浼氳瘽蹇冭烦鏃堕棿
options.setKeepAliveInterval(MQTT_KEEP_ALIVE);
// 鏄惁娓呴櫎session
- options.setCleanSession(true);
+ options.setCleanSession(false);
log.debug("--鐢熸垚mqtt閰嶇疆瀵硅薄");
return options;
}
- /**
- * qos --- 3 ---
- */
- public int[] getQos(int length) {
-
- int[] qos = new int[length];
- for (int i = 0; i < length; i++) {
- /**
- * MQTT鍗忚涓湁涓夌娑堟伅鍙戝竷鏈嶅姟璐ㄩ噺:
- *
- * QOS0锛� 鈥滆嚦澶氫竴娆♀�濓紝娑堟伅鍙戝竷瀹屽叏渚濊禆搴曞眰 TCP/IP 缃戠粶銆備細鍙戠敓娑堟伅涓㈠け鎴栭噸澶嶃�傝繖涓�绾у埆鍙敤浜庡涓嬫儏鍐碉紝鐜浼犳劅鍣ㄦ暟鎹紝涓㈠け涓�娆¤璁板綍鏃犳墍璋擄紝鍥犱负涓嶄箙鍚庤繕浼氭湁绗簩娆″彂閫併��
- * QOS1锛� 鈥滆嚦灏戜竴娆♀�濓紝纭繚娑堟伅鍒拌揪锛屼絾娑堟伅閲嶅鍙兘浼氬彂鐢熴��
- * QOS2锛� 鈥滃彧鏈変竴娆♀�濓紝纭繚娑堟伅鍒拌揪涓�娆°�傝繖涓�绾у埆鍙敤浜庡涓嬫儏鍐碉紝鍦ㄨ璐圭郴缁熶腑锛屾秷鎭噸澶嶆垨涓㈠け浼氬鑷翠笉姝g‘鐨勭粨鏋滐紝璧勬簮寮�閿�澶�
- */
- qos[i] = 1;
- }
- log.debug("--璁剧疆娑堟伅鍙戝竷璐ㄩ噺");
- return qos;
- }
/**
- * 瑁呰浇鍚勭瀹炰緥鍜岃闃呬富棰� --- 4 ---
+ * 杩炴帴骞惰杞藉洖璋� --- 3 ---
*/
- public void create(MqttConnectOptions options, String[] topic, int[] qos) {
+ public void create(MqttConnectOptions options) {
try {
- client.setCallback(new MqttConsumerCallback(client, options, topic, qos));
+ client.setCallback(new MqttProducerCallback(client, options));
log.debug("--娣诲姞鍥炶皟澶勭悊绫�");
client.connect(options);
} catch (Exception e) {
- log.info("瑁呰浇瀹炰緥鎴栬闃呬富棰樺紓甯革細" + e);
+ log.info("杩炴帴骞惰杞藉洖璋冨紓甯革細" + e);
}
}
- /**
- * 璁㈤槄鏌愪釜涓婚
- *
- * @param topic
- * @param qos
- */
- public void subscribe(String topic, int qos) {
- try {
- log.debug("topic:" + topic);
- client.subscribe(topic, qos);
- } catch (MqttException e) {
- e.printStackTrace();
- }
- }
+
/**
* 鍙戝竷锛岄潪鎸佷箙鍖�
* <p>
- * qos鏍规嵁鏂囨。璁剧疆涓�1
+ * qos鏍规嵁鏂囨。璁剧疆涓�2
*
* @param topic
* @param msg
*/
public static void publish(String topic, String msg) {
- publish(1, false, topic, msg);
+ publish(2, false, topic, msg);
}
/**
diff --git a/src/main/java/com/ard/utils/mqtt/MqttConsumerCallback.java b/src/main/java/com/ard/utils/mqtt/MqttProducerCallback.java
similarity index 72%
rename from src/main/java/com/ard/utils/mqtt/MqttConsumerCallback.java
rename to src/main/java/com/ard/utils/mqtt/MqttProducerCallback.java
index b0ee239..af6cac1 100644
--- a/src/main/java/com/ard/utils/mqtt/MqttConsumerCallback.java
+++ b/src/main/java/com/ard/utils/mqtt/MqttProducerCallback.java
@@ -2,6 +2,7 @@
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
+
import java.util.Arrays;
/**
@@ -12,18 +13,14 @@
* @Version: 1.0
**/
@Slf4j(topic = "mqtt")
-public class MqttConsumerCallback implements MqttCallbackExtended {
+public class MqttProducerCallback implements MqttCallbackExtended {
private MqttClient client;
private MqttConnectOptions options;
- private String[] topic;
- private int[] qos;
- public MqttConsumerCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {
+ public MqttProducerCallback(MqttClient client, MqttConnectOptions options) {
this.client = client;
this.options = options;
- this.topic = topic;
- this.qos = qos;
}
/**
@@ -73,22 +70,18 @@
}
/**
- * mqtt杩炴帴鍚庤闃呬富棰�
+ * mqtt杩炴帴瀹屾垚
*/
@Override
public void connectComplete(boolean b, String s) {
try {
- if (null != topic && null != qos) {
- if (client.isConnected()) {
- client.subscribe(topic, qos);
- log.debug("mqtt杩炴帴鎴愬姛");
- log.debug("--璁㈤槄涓婚:锛�" + Arrays.toString(topic));
- } else {
- log.debug("mqtt杩炴帴澶辫触");
- }
+ if (client.isConnected()) {
+ log.info("mqtt杩炴帴鎴愬姛");
+ } else {
+ log.info("mqtt杩炴帴澶辫触");
}
} catch (Exception e) {
- log.error("mqtt璁㈤槄涓婚寮傚父:" + e);
+ log.error("mqtt杩炴帴寮傚父:" + e);
}
}
}
diff --git a/src/main/java/com/ard/utils/tcp/ClientHandler.java b/src/main/java/com/ard/utils/tcp/ClientHandler.java
index dd35018..6df4b26 100644
--- a/src/main/java/com/ard/utils/tcp/ClientHandler.java
+++ b/src/main/java/com/ard/utils/tcp/ClientHandler.java
@@ -6,7 +6,7 @@
import com.ard.alarm.radar.domain.RadarAlarmData;
import com.ard.utils.other.ByteUtils;
import com.ard.utils.other.GisUtils;
-import com.ard.utils.mqtt.MqttConsumer;
+import com.ard.utils.mqtt.MqttProducer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
@@ -432,10 +432,10 @@
radarAlarmData.setRadarName(radarName);
radarAlarmData.setAlarmTime(alarmTime);
radarAlarmData.setArdAlarmRadars(radarAlarmInfos);
- MqttConsumer.publish(2, false, "radar", JSON.toJSONString(radarAlarmData));
+ MqttProducer.publish(2, false, "radar", JSON.toJSONString(radarAlarmData));
//鎶芥补鏈虹姸鎬丮QTT闃熷垪
radarAlarmData.setArdAlarmRadars(well);
- MqttConsumer.publish(2, false, "radarWellData", JSON.toJSONString(radarAlarmData));
+ MqttProducer.publish(2, false, "radarWellData", JSON.toJSONString(radarAlarmData));
} catch (Exception ex) {
log.error("闆疯揪鎶ユ枃瑙f瀽寮傚父:" + ex.getMessage());
}
diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml
index 83184ea..a7b1837 100644
--- a/src/main/resources/application-dev.yml
+++ b/src/main/resources/application-dev.yml
@@ -39,10 +39,9 @@
# mqtt閰嶇疆
mqtt:
host: tcp://192.168.2.15:1883
- clientId: c227
+ clientId: client-227
username: admin
password: admin
- topic: tube
timeout: 100
keepalive: 60
enabled: true
--
Gitblit v1.9.3