src/main/java/com/ard/alarm/apponekey/service/impl/ArdAlarmApponekeyServiceImpl.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/ard/alarm/digitization/service/impl/DataBridgeServiceImpl.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/ard/alarm/stealelec/service/StealElecAlarmService.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/ard/alarm/tube/service/TubeAlarmService.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/ard/utils/hiksdk/service/impl/FMSGCallBack.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/ard/utils/mqtt/MqttProducer.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/ard/utils/mqtt/MqttProducerCallback.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/ard/utils/tcp/ClientHandler.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/resources/application-dev.yml | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
src/main/java/com/ard/alarm/apponekey/service/impl/ArdAlarmApponekeyServiceImpl.java
@@ -1,10 +1,9 @@ package com.ard.alarm.apponekey.service.impl; import java.util.Date; import java.util.List; import com.alibaba.fastjson2.JSON; import com.ard.utils.mqtt.MqttConsumer; import com.ard.utils.mqtt.MqttProducer; import com.ard.utils.uuid.IdUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -34,7 +33,7 @@ ardAlarmApponekey.setId(IdUtils.fastSimpleUUID()); ardAlarmApponekey.setCreateTime(new Date()); printLog(ardAlarmApponekey); MqttConsumer.publish(2, false, "apponekey", JSON.toJSONString(ardAlarmApponekey)); MqttProducer.publish(2, false, "apponekey", JSON.toJSONString(ardAlarmApponekey)); return 1; } catch (Exception ex) { log.error("ä¸é®æ¥è¦å¤çå¼å¸¸:" + ex.getMessage()); src/main/java/com/ard/alarm/digitization/service/impl/DataBridgeServiceImpl.java
@@ -4,7 +4,7 @@ import com.ard.alarm.digitization.model.DataBridge; import com.ard.alarm.digitization.service.DataBridgeService; import com.ard.utils.jdbc.Query; import com.ard.utils.mqtt.MqttConsumer; import com.ard.utils.mqtt.MqttProducer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; @@ -52,7 +52,7 @@ data.setWellNo(elem[0]); data.setAlarmType(elem[1]); data.setAlarmTime(elem[2]); MqttConsumer.publish(2, false, "digitization3", JSON.toJSONString(data)); MqttProducer.publish(2, false, "digitization3", JSON.toJSONString(data)); } } } catch (Exception ex) { src/main/java/com/ard/alarm/stealelec/service/StealElecAlarmService.java
@@ -4,13 +4,13 @@ import com.alibaba.fastjson2.JSONObject; import com.ard.alarm.stealelec.domain.ArdAlarmStealelec; import com.ard.utils.http.HttpUtils; import com.ard.utils.mqtt.MqttConsumer; import com.ard.utils.mqtt.MqttProducer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -68,7 +68,7 @@ if (tempList.contains(wd.getId())) { continue; } MqttConsumer.publish(2, false, "stealelec", JSON.toJSONString(wd)); MqttProducer.publish(2, false, "stealelec", JSON.toJSONString(wd)); tempList.add(wd.getId()); } } src/main/java/com/ard/alarm/tube/service/TubeAlarmService.java
@@ -2,7 +2,7 @@ import com.alibaba.fastjson2.JSON; import com.ard.utils.other.DateUtils; import com.ard.utils.mqtt.MqttConsumer; import com.ard.utils.mqtt.MqttProducer; import com.ard.utils.udp.NettyUdpServer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; @@ -73,7 +73,7 @@ log.debug("ä½ç½®ï¼" + position); log.debug("å¼ç人ï¼" + watcher); log.debug("ç»æè§£æ" + stop); MqttConsumer.publish(2, false, "tube", JSON.toJSONString(map)); MqttProducer.publish(2, false, "tube", JSON.toJSONString(map)); } else { log.error("æ°æ®å¼å¸¸"); } src/main/java/com/ard/utils/hiksdk/service/impl/FMSGCallBack.java
@@ -14,7 +14,7 @@ import com.ard.utils.hiksdk.domain.ExternalAlarmEventInfo; import com.ard.utils.hiksdk.util.hikSdkUtil.HCNetSDK; import com.ard.utils.other.DateUtils; import com.ard.utils.mqtt.MqttConsumer; import com.ard.utils.mqtt.MqttProducer; import com.ard.utils.spring.SpringUtils; import com.ard.utils.uuid.IdUtils; import com.sun.jna.Pointer; @@ -440,7 +440,7 @@ */ private void publishMqtt(CameraEventInfo info) { printLog(info); MqttConsumer.publish(2, false, "camera", JSON.toJSONString(info)); MqttProducer.publish(2, false, "camera", JSON.toJSONString(info)); } /** @@ -448,7 +448,7 @@ */ private void publishMqtt(ExternalAlarmEventInfo info) { printLog(info); MqttConsumer.publish(2, false, "external", JSON.toJSONString(info)); MqttProducer.publish(2, false, "external", JSON.toJSONString(info)); } /** @@ -456,7 +456,7 @@ */ private void publishMqtt(AccessControlHostEventInfo info) { printLog(info); MqttConsumer.publish(2, false, "accessControl", JSON.toJSONString(info)); MqttProducer.publish(2, false, "accessControl", JSON.toJSONString(info)); } /** src/main/java/com/ard/utils/mqtt/MqttProducer.java
ÎļþÃû´Ó src/main/java/com/ard/utils/mqtt/MqttConsumer.java ÐÞ¸Ä @@ -1,21 +1,19 @@ package com.ard.utils.mqtt; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.annotation.Order; import org.springframework.expression.spel.ast.NullLiteral; import org.springframework.stereotype.Component; import java.io.UnsupportedEncodingException; /** * @Description: mqttæ¶è´¹å®¢æ·ç«¯ * @ClassName: MqttConsumer * @Description: mqttç产客æ·ç«¯ * @ClassName: MqttProducer * @Author: åèä¹ * @Date: 2023å¹´05æ29æ¥9:55 * @Version: 1.0 @@ -23,11 +21,9 @@ @Component @Slf4j(topic = "mqtt") @Order(1) public class MqttConsumer implements ApplicationRunner { public class MqttProducer implements ApplicationRunner { @Value("${spring.mqtt.enabled}") private Boolean MQTT_ENABLED; @Value("${spring.mqtt.topic}") private String MQTT_TOPIC; @Value("${spring.mqtt.host}") private String MQTT_HOST; @Value("${spring.mqtt.clientId}") @@ -60,11 +56,8 @@ getClient(); // 2 设置é ç½® MqttConnectOptions options = getOptions(); String[] topic = MQTT_TOPIC.split(","); // 3 æ¶æ¯åå¸è´¨é int[] qos = getQos(topic.length); // 4 æå设置 create(options, topic, qos); // 3 æå设置 create(options); } catch (Exception e) { log.error("mqttè¿æ¥å¼å¸¸ï¼" + e); } @@ -97,69 +90,37 @@ // 设置ä¼è¯å¿è·³æ¶é´ options.setKeepAliveInterval(MQTT_KEEP_ALIVE); // æ¯å¦æ¸ é¤session options.setCleanSession(true); options.setCleanSession(false); log.debug("--çæmqtté 置对象"); return options; } /** * qos --- 3 --- */ public int[] getQos(int length) { int[] qos = new int[length]; for (int i = 0; i < length; i++) { /** * MQTTåè®®ä¸æä¸ç§æ¶æ¯å叿å¡è´¨é: * * QOS0ï¼ âè³å¤ä¸æ¬¡âï¼æ¶æ¯åå¸å®å ¨ä¾èµåºå± TCP/IP ç½ç»ãä¼åçæ¶æ¯ä¸¢å¤±æéå¤ãè¿ä¸çº§å«å¯ç¨äºå¦ä¸æ åµï¼ç¯å¢ä¼ æå¨æ°æ®ï¼ä¸¢å¤±ä¸æ¬¡è¯»è®°å½æ æè°ï¼å 为ä¸ä¹ åè¿ä¼æç¬¬äºæ¬¡åéã * QOS1ï¼ âè³å°ä¸æ¬¡âï¼ç¡®ä¿æ¶æ¯å°è¾¾ï¼ä½æ¶æ¯éå¤å¯è½ä¼åçã * QOS2ï¼ âåªæä¸æ¬¡âï¼ç¡®ä¿æ¶æ¯å°è¾¾ä¸æ¬¡ãè¿ä¸çº§å«å¯ç¨äºå¦ä¸æ åµï¼å¨è®¡è´¹ç³»ç»ä¸ï¼æ¶æ¯é夿䏢失ä¼å¯¼è´ä¸æ£ç¡®çç»æï¼èµæºå¼é大 */ qos[i] = 1; } log.debug("--è®¾ç½®æ¶æ¯åå¸è´¨é"); return qos; } /** * è£ è½½åç§å®ä¾å订é ä¸»é¢ --- 4 --- * è¿æ¥å¹¶è£ è½½åè° --- 3 --- */ public void create(MqttConnectOptions options, String[] topic, int[] qos) { public void create(MqttConnectOptions options) { try { client.setCallback(new MqttConsumerCallback(client, options, topic, qos)); client.setCallback(new MqttProducerCallback(client, options)); log.debug("--æ·»å åè°å¤çç±»"); client.connect(options); } catch (Exception e) { log.info("è£ è½½å®ä¾æè®¢é 主é¢å¼å¸¸ï¼" + e); log.info("è¿æ¥å¹¶è£ è½½åè°å¼å¸¸ï¼" + e); } } /** * è®¢é æä¸ªä¸»é¢ * * @param topic * @param qos */ public void subscribe(String topic, int qos) { try { log.debug("topic:" + topic); client.subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } /** * åå¸ï¼éæä¹ å * <p> * qosæ ¹æ®ææ¡£è®¾ç½®ä¸º1 * qosæ ¹æ®ææ¡£è®¾ç½®ä¸º2 * * @param topic * @param msg */ public static void publish(String topic, String msg) { publish(1, false, topic, msg); publish(2, false, topic, msg); } /** src/main/java/com/ard/utils/mqtt/MqttProducerCallback.java
ÎļþÃû´Ó src/main/java/com/ard/utils/mqtt/MqttConsumerCallback.java ÐÞ¸Ä @@ -2,6 +2,7 @@ import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import java.util.Arrays; /** @@ -12,18 +13,14 @@ * @Version: 1.0 **/ @Slf4j(topic = "mqtt") public class MqttConsumerCallback implements MqttCallbackExtended { public class MqttProducerCallback implements MqttCallbackExtended { private MqttClient client; private MqttConnectOptions options; private String[] topic; private int[] qos; public MqttConsumerCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) { public MqttProducerCallback(MqttClient client, MqttConnectOptions options) { this.client = client; this.options = options; this.topic = topic; this.qos = qos; } /** @@ -73,22 +70,18 @@ } /** * mqttè¿æ¥å订é ä¸»é¢ * mqttè¿æ¥å®æ */ @Override public void connectComplete(boolean b, String s) { try { if (null != topic && null != qos) { if (client.isConnected()) { client.subscribe(topic, qos); log.debug("mqttè¿æ¥æå"); log.debug("--订é 主é¢:ï¼" + Arrays.toString(topic)); log.info("mqttè¿æ¥æå"); } else { log.debug("mqttè¿æ¥å¤±è´¥"); } log.info("mqttè¿æ¥å¤±è´¥"); } } catch (Exception e) { log.error("mqtt订é 主é¢å¼å¸¸:" + e); log.error("mqttè¿æ¥å¼å¸¸:" + e); } } } src/main/java/com/ard/utils/tcp/ClientHandler.java
@@ -6,7 +6,7 @@ import com.ard.alarm.radar.domain.RadarAlarmData; import com.ard.utils.other.ByteUtils; import com.ard.utils.other.GisUtils; import com.ard.utils.mqtt.MqttConsumer; import com.ard.utils.mqtt.MqttProducer; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelId; @@ -432,10 +432,10 @@ radarAlarmData.setRadarName(radarName); radarAlarmData.setAlarmTime(alarmTime); radarAlarmData.setArdAlarmRadars(radarAlarmInfos); MqttConsumer.publish(2, false, "radar", JSON.toJSONString(radarAlarmData)); MqttProducer.publish(2, false, "radar", JSON.toJSONString(radarAlarmData)); //æ½æ²¹æºç¶æMQTTéå radarAlarmData.setArdAlarmRadars(well); MqttConsumer.publish(2, false, "radarWellData", JSON.toJSONString(radarAlarmData)); MqttProducer.publish(2, false, "radarWellData", JSON.toJSONString(radarAlarmData)); } catch (Exception ex) { log.error("é·è¾¾æ¥æè§£æå¼å¸¸:" + ex.getMessage()); } src/main/resources/application-dev.yml
@@ -39,10 +39,9 @@ # mqtté ç½® mqtt: host: tcp://192.168.2.15:1883 clientId: c227 clientId: client-227 username: admin password: admin topic: tube timeout: 100 keepalive: 60 enabled: true