From 157c26f5188c7ed62a4547f7e3b5a5a3e3ed7729 Mon Sep 17 00:00:00 2001 From: ‘liusuyi’ <1951119284@qq.com> Date: 星期日, 08 十月 2023 09:40:47 +0800 Subject: [PATCH] 优化mqtt生产者取消消费者订阅 --- src/main/java/com/ard/alarm/apponekey/service/impl/ArdAlarmApponekeyServiceImpl.java | 5 - src/main/java/com/ard/utils/tcp/ClientHandler.java | 6 +- src/main/java/com/ard/alarm/stealelec/service/StealElecAlarmService.java | 6 +- src/main/java/com/ard/utils/hiksdk/service/impl/FMSGCallBack.java | 8 +- src/main/java/com/ard/utils/mqtt/MqttProducerCallback.java | 25 +++----- src/main/java/com/ard/alarm/digitization/service/impl/DataBridgeServiceImpl.java | 4 src/main/resources/application-dev.yml | 3 src/main/java/com/ard/utils/mqtt/MqttProducer.java | 65 ++++----------------- src/main/java/com/ard/alarm/tube/service/TubeAlarmService.java | 4 9 files changed, 39 insertions(+), 87 deletions(-) diff --git a/src/main/java/com/ard/alarm/apponekey/service/impl/ArdAlarmApponekeyServiceImpl.java b/src/main/java/com/ard/alarm/apponekey/service/impl/ArdAlarmApponekeyServiceImpl.java index f002e47..dba5f71 100644 --- a/src/main/java/com/ard/alarm/apponekey/service/impl/ArdAlarmApponekeyServiceImpl.java +++ b/src/main/java/com/ard/alarm/apponekey/service/impl/ArdAlarmApponekeyServiceImpl.java @@ -1,10 +1,9 @@ package com.ard.alarm.apponekey.service.impl; import java.util.Date; -import java.util.List; import com.alibaba.fastjson2.JSON; -import com.ard.utils.mqtt.MqttConsumer; +import com.ard.utils.mqtt.MqttProducer; import com.ard.utils.uuid.IdUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -34,7 +33,7 @@ ardAlarmApponekey.setId(IdUtils.fastSimpleUUID()); ardAlarmApponekey.setCreateTime(new Date()); printLog(ardAlarmApponekey); - MqttConsumer.publish(2, false, "apponekey", JSON.toJSONString(ardAlarmApponekey)); + MqttProducer.publish(2, false, "apponekey", JSON.toJSONString(ardAlarmApponekey)); return 1; } catch (Exception ex) { log.error("涓�閿姤璀﹀鐞嗗紓甯�:" + ex.getMessage()); diff --git a/src/main/java/com/ard/alarm/digitization/service/impl/DataBridgeServiceImpl.java b/src/main/java/com/ard/alarm/digitization/service/impl/DataBridgeServiceImpl.java index 981f2a8..8b04e6f 100644 --- a/src/main/java/com/ard/alarm/digitization/service/impl/DataBridgeServiceImpl.java +++ b/src/main/java/com/ard/alarm/digitization/service/impl/DataBridgeServiceImpl.java @@ -4,7 +4,7 @@ import com.ard.alarm.digitization.model.DataBridge; import com.ard.alarm.digitization.service.DataBridgeService; import com.ard.utils.jdbc.Query; -import com.ard.utils.mqtt.MqttConsumer; +import com.ard.utils.mqtt.MqttProducer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; @@ -52,7 +52,7 @@ data.setWellNo(elem[0]); data.setAlarmType(elem[1]); data.setAlarmTime(elem[2]); - MqttConsumer.publish(2, false, "digitization3", JSON.toJSONString(data)); + MqttProducer.publish(2, false, "digitization3", JSON.toJSONString(data)); } } } catch (Exception ex) { diff --git a/src/main/java/com/ard/alarm/stealelec/service/StealElecAlarmService.java b/src/main/java/com/ard/alarm/stealelec/service/StealElecAlarmService.java index ced8991..2714637 100644 --- a/src/main/java/com/ard/alarm/stealelec/service/StealElecAlarmService.java +++ b/src/main/java/com/ard/alarm/stealelec/service/StealElecAlarmService.java @@ -4,13 +4,13 @@ import com.alibaba.fastjson2.JSONObject; import com.ard.alarm.stealelec.domain.ArdAlarmStealelec; import com.ard.utils.http.HttpUtils; -import com.ard.utils.mqtt.MqttConsumer; +import com.ard.utils.mqtt.MqttProducer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; -import javax.annotation.Resource; + import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -68,7 +68,7 @@ if (tempList.contains(wd.getId())) { continue; } - MqttConsumer.publish(2, false, "stealelec", JSON.toJSONString(wd)); + MqttProducer.publish(2, false, "stealelec", JSON.toJSONString(wd)); tempList.add(wd.getId()); } } diff --git a/src/main/java/com/ard/alarm/tube/service/TubeAlarmService.java b/src/main/java/com/ard/alarm/tube/service/TubeAlarmService.java index 06584e6..f15cb3f 100644 --- a/src/main/java/com/ard/alarm/tube/service/TubeAlarmService.java +++ b/src/main/java/com/ard/alarm/tube/service/TubeAlarmService.java @@ -2,7 +2,7 @@ import com.alibaba.fastjson2.JSON; import com.ard.utils.other.DateUtils; -import com.ard.utils.mqtt.MqttConsumer; +import com.ard.utils.mqtt.MqttProducer; import com.ard.utils.udp.NettyUdpServer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; @@ -73,7 +73,7 @@ log.debug("浣嶇疆锛�" + position); log.debug("鍊肩彮浜猴細" + watcher); log.debug("缁撴潫瑙f瀽" + stop); - MqttConsumer.publish(2, false, "tube", JSON.toJSONString(map)); + MqttProducer.publish(2, false, "tube", JSON.toJSONString(map)); } else { log.error("鏁版嵁寮傚父"); } diff --git a/src/main/java/com/ard/utils/hiksdk/service/impl/FMSGCallBack.java b/src/main/java/com/ard/utils/hiksdk/service/impl/FMSGCallBack.java index 62f68de..7deea3e 100644 --- a/src/main/java/com/ard/utils/hiksdk/service/impl/FMSGCallBack.java +++ b/src/main/java/com/ard/utils/hiksdk/service/impl/FMSGCallBack.java @@ -14,7 +14,7 @@ import com.ard.utils.hiksdk.domain.ExternalAlarmEventInfo; import com.ard.utils.hiksdk.util.hikSdkUtil.HCNetSDK; import com.ard.utils.other.DateUtils; -import com.ard.utils.mqtt.MqttConsumer; +import com.ard.utils.mqtt.MqttProducer; import com.ard.utils.spring.SpringUtils; import com.ard.utils.uuid.IdUtils; import com.sun.jna.Pointer; @@ -440,7 +440,7 @@ */ private void publishMqtt(CameraEventInfo info) { printLog(info); - MqttConsumer.publish(2, false, "camera", JSON.toJSONString(info)); + MqttProducer.publish(2, false, "camera", JSON.toJSONString(info)); } /** @@ -448,7 +448,7 @@ */ private void publishMqtt(ExternalAlarmEventInfo info) { printLog(info); - MqttConsumer.publish(2, false, "external", JSON.toJSONString(info)); + MqttProducer.publish(2, false, "external", JSON.toJSONString(info)); } /** @@ -456,7 +456,7 @@ */ private void publishMqtt(AccessControlHostEventInfo info) { printLog(info); - MqttConsumer.publish(2, false, "accessControl", JSON.toJSONString(info)); + MqttProducer.publish(2, false, "accessControl", JSON.toJSONString(info)); } /** diff --git a/src/main/java/com/ard/utils/mqtt/MqttConsumer.java b/src/main/java/com/ard/utils/mqtt/MqttProducer.java similarity index 66% rename from src/main/java/com/ard/utils/mqtt/MqttConsumer.java rename to src/main/java/com/ard/utils/mqtt/MqttProducer.java index fc03b39..aef7240 100644 --- a/src/main/java/com/ard/utils/mqtt/MqttConsumer.java +++ b/src/main/java/com/ard/utils/mqtt/MqttProducer.java @@ -1,21 +1,19 @@ package com.ard.utils.mqtt; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.annotation.Order; -import org.springframework.expression.spel.ast.NullLiteral; import org.springframework.stereotype.Component; import java.io.UnsupportedEncodingException; /** - * @Description: mqtt娑堣垂瀹㈡埛绔� - * @ClassName: MqttConsumer + * @Description: mqtt鐢熶骇瀹㈡埛绔� + * @ClassName: MqttProducer * @Author: 鍒樿嫃涔� * @Date: 2023骞�05鏈�29鏃�9:55 * @Version: 1.0 @@ -23,11 +21,9 @@ @Component @Slf4j(topic = "mqtt") @Order(1) -public class MqttConsumer implements ApplicationRunner { +public class MqttProducer implements ApplicationRunner { @Value("${spring.mqtt.enabled}") private Boolean MQTT_ENABLED; - @Value("${spring.mqtt.topic}") - private String MQTT_TOPIC; @Value("${spring.mqtt.host}") private String MQTT_HOST; @Value("${spring.mqtt.clientId}") @@ -60,11 +56,8 @@ getClient(); // 2 璁剧疆閰嶇疆 MqttConnectOptions options = getOptions(); - String[] topic = MQTT_TOPIC.split(","); - // 3 娑堟伅鍙戝竷璐ㄩ噺 - int[] qos = getQos(topic.length); - // 4 鏈�鍚庤缃� - create(options, topic, qos); + // 3 鏈�鍚庤缃� + create(options); } catch (Exception e) { log.error("mqtt杩炴帴寮傚父锛�" + e); } @@ -97,69 +90,37 @@ // 璁剧疆浼氳瘽蹇冭烦鏃堕棿 options.setKeepAliveInterval(MQTT_KEEP_ALIVE); // 鏄惁娓呴櫎session - options.setCleanSession(true); + options.setCleanSession(false); log.debug("--鐢熸垚mqtt閰嶇疆瀵硅薄"); return options; } - /** - * qos --- 3 --- - */ - public int[] getQos(int length) { - - int[] qos = new int[length]; - for (int i = 0; i < length; i++) { - /** - * MQTT鍗忚涓湁涓夌娑堟伅鍙戝竷鏈嶅姟璐ㄩ噺: - * - * QOS0锛� 鈥滆嚦澶氫竴娆♀�濓紝娑堟伅鍙戝竷瀹屽叏渚濊禆搴曞眰 TCP/IP 缃戠粶銆備細鍙戠敓娑堟伅涓㈠け鎴栭噸澶嶃�傝繖涓�绾у埆鍙敤浜庡涓嬫儏鍐碉紝鐜浼犳劅鍣ㄦ暟鎹紝涓㈠け涓�娆¤璁板綍鏃犳墍璋擄紝鍥犱负涓嶄箙鍚庤繕浼氭湁绗簩娆″彂閫併�� - * QOS1锛� 鈥滆嚦灏戜竴娆♀�濓紝纭繚娑堟伅鍒拌揪锛屼絾娑堟伅閲嶅鍙兘浼氬彂鐢熴�� - * QOS2锛� 鈥滃彧鏈変竴娆♀�濓紝纭繚娑堟伅鍒拌揪涓�娆°�傝繖涓�绾у埆鍙敤浜庡涓嬫儏鍐碉紝鍦ㄨ璐圭郴缁熶腑锛屾秷鎭噸澶嶆垨涓㈠け浼氬鑷翠笉姝g‘鐨勭粨鏋滐紝璧勬簮寮�閿�澶� - */ - qos[i] = 1; - } - log.debug("--璁剧疆娑堟伅鍙戝竷璐ㄩ噺"); - return qos; - } /** - * 瑁呰浇鍚勭瀹炰緥鍜岃闃呬富棰� --- 4 --- + * 杩炴帴骞惰杞藉洖璋� --- 3 --- */ - public void create(MqttConnectOptions options, String[] topic, int[] qos) { + public void create(MqttConnectOptions options) { try { - client.setCallback(new MqttConsumerCallback(client, options, topic, qos)); + client.setCallback(new MqttProducerCallback(client, options)); log.debug("--娣诲姞鍥炶皟澶勭悊绫�"); client.connect(options); } catch (Exception e) { - log.info("瑁呰浇瀹炰緥鎴栬闃呬富棰樺紓甯革細" + e); + log.info("杩炴帴骞惰杞藉洖璋冨紓甯革細" + e); } } - /** - * 璁㈤槄鏌愪釜涓婚 - * - * @param topic - * @param qos - */ - public void subscribe(String topic, int qos) { - try { - log.debug("topic:" + topic); - client.subscribe(topic, qos); - } catch (MqttException e) { - e.printStackTrace(); - } - } + /** * 鍙戝竷锛岄潪鎸佷箙鍖� * <p> - * qos鏍规嵁鏂囨。璁剧疆涓�1 + * qos鏍规嵁鏂囨。璁剧疆涓�2 * * @param topic * @param msg */ public static void publish(String topic, String msg) { - publish(1, false, topic, msg); + publish(2, false, topic, msg); } /** diff --git a/src/main/java/com/ard/utils/mqtt/MqttConsumerCallback.java b/src/main/java/com/ard/utils/mqtt/MqttProducerCallback.java similarity index 72% rename from src/main/java/com/ard/utils/mqtt/MqttConsumerCallback.java rename to src/main/java/com/ard/utils/mqtt/MqttProducerCallback.java index b0ee239..af6cac1 100644 --- a/src/main/java/com/ard/utils/mqtt/MqttConsumerCallback.java +++ b/src/main/java/com/ard/utils/mqtt/MqttProducerCallback.java @@ -2,6 +2,7 @@ import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; + import java.util.Arrays; /** @@ -12,18 +13,14 @@ * @Version: 1.0 **/ @Slf4j(topic = "mqtt") -public class MqttConsumerCallback implements MqttCallbackExtended { +public class MqttProducerCallback implements MqttCallbackExtended { private MqttClient client; private MqttConnectOptions options; - private String[] topic; - private int[] qos; - public MqttConsumerCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) { + public MqttProducerCallback(MqttClient client, MqttConnectOptions options) { this.client = client; this.options = options; - this.topic = topic; - this.qos = qos; } /** @@ -73,22 +70,18 @@ } /** - * mqtt杩炴帴鍚庤闃呬富棰� + * mqtt杩炴帴瀹屾垚 */ @Override public void connectComplete(boolean b, String s) { try { - if (null != topic && null != qos) { - if (client.isConnected()) { - client.subscribe(topic, qos); - log.debug("mqtt杩炴帴鎴愬姛"); - log.debug("--璁㈤槄涓婚:锛�" + Arrays.toString(topic)); - } else { - log.debug("mqtt杩炴帴澶辫触"); - } + if (client.isConnected()) { + log.info("mqtt杩炴帴鎴愬姛"); + } else { + log.info("mqtt杩炴帴澶辫触"); } } catch (Exception e) { - log.error("mqtt璁㈤槄涓婚寮傚父:" + e); + log.error("mqtt杩炴帴寮傚父:" + e); } } } diff --git a/src/main/java/com/ard/utils/tcp/ClientHandler.java b/src/main/java/com/ard/utils/tcp/ClientHandler.java index dd35018..6df4b26 100644 --- a/src/main/java/com/ard/utils/tcp/ClientHandler.java +++ b/src/main/java/com/ard/utils/tcp/ClientHandler.java @@ -6,7 +6,7 @@ import com.ard.alarm.radar.domain.RadarAlarmData; import com.ard.utils.other.ByteUtils; import com.ard.utils.other.GisUtils; -import com.ard.utils.mqtt.MqttConsumer; +import com.ard.utils.mqtt.MqttProducer; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelId; @@ -432,10 +432,10 @@ radarAlarmData.setRadarName(radarName); radarAlarmData.setAlarmTime(alarmTime); radarAlarmData.setArdAlarmRadars(radarAlarmInfos); - MqttConsumer.publish(2, false, "radar", JSON.toJSONString(radarAlarmData)); + MqttProducer.publish(2, false, "radar", JSON.toJSONString(radarAlarmData)); //鎶芥补鏈虹姸鎬丮QTT闃熷垪 radarAlarmData.setArdAlarmRadars(well); - MqttConsumer.publish(2, false, "radarWellData", JSON.toJSONString(radarAlarmData)); + MqttProducer.publish(2, false, "radarWellData", JSON.toJSONString(radarAlarmData)); } catch (Exception ex) { log.error("闆疯揪鎶ユ枃瑙f瀽寮傚父:" + ex.getMessage()); } diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 83184ea..a7b1837 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -39,10 +39,9 @@ # mqtt閰嶇疆 mqtt: host: tcp://192.168.2.15:1883 - clientId: c227 + clientId: client-227 username: admin password: admin - topic: tube timeout: 100 keepalive: 60 enabled: true -- Gitblit v1.9.3