ÎļþÃû´Ó src/main/java/com/ard/utils/mqtt/MqttConsumer.java ÐÞ¸Ä |
| | |
| | | package com.ard.utils.mqtt; |
| | | |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.apache.commons.lang3.StringUtils; |
| | | import org.eclipse.paho.client.mqttv3.*; |
| | | import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; |
| | | import org.springframework.beans.factory.annotation.Value; |
| | | import org.springframework.boot.ApplicationArguments; |
| | | import org.springframework.boot.ApplicationRunner; |
| | | import org.springframework.core.annotation.Order; |
| | | import org.springframework.expression.spel.ast.NullLiteral; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import java.io.UnsupportedEncodingException; |
| | | |
| | | /** |
| | | * @Description: mqttæ¶è´¹å®¢æ·ç«¯ |
| | | * @ClassName: MqttConsumer |
| | | * @Description: mqttç产客æ·ç«¯ |
| | | * @ClassName: MqttProducer |
| | | * @Author: åèä¹ |
| | | * @Date: 2023å¹´05æ29æ¥9:55 |
| | | * @Version: 1.0 |
| | |
| | | @Component |
| | | @Slf4j(topic = "mqtt") |
| | | @Order(1) |
| | | public class MqttConsumer implements ApplicationRunner { |
| | | public class MqttProducer implements ApplicationRunner { |
| | | @Value("${spring.mqtt.enabled}") |
| | | private Boolean MQTT_ENABLED; |
| | | @Value("${spring.mqtt.topic}") |
| | | private String MQTT_TOPIC; |
| | | @Value("${spring.mqtt.host}") |
| | | private String MQTT_HOST; |
| | | @Value("${spring.mqtt.clientId}") |
| | |
| | | getClient(); |
| | | // 2 设置é
ç½® |
| | | MqttConnectOptions options = getOptions(); |
| | | String[] topic = MQTT_TOPIC.split(","); |
| | | // 3 æ¶æ¯åå¸è´¨é |
| | | int[] qos = getQos(topic.length); |
| | | // 4 æå设置 |
| | | create(options, topic, qos); |
| | | // 3 æå设置 |
| | | create(options); |
| | | } catch (Exception e) { |
| | | log.error("mqttè¿æ¥å¼å¸¸ï¼" + e); |
| | | } |
| | |
| | | // 设置ä¼è¯å¿è·³æ¶é´ |
| | | options.setKeepAliveInterval(MQTT_KEEP_ALIVE); |
| | | // æ¯å¦æ¸
é¤session |
| | | options.setCleanSession(true); |
| | | options.setCleanSession(false); |
| | | log.debug("--çæmqtté
置对象"); |
| | | return options; |
| | | } |
| | | |
| | | /** |
| | | * qos --- 3 --- |
| | | */ |
| | | public int[] getQos(int length) { |
| | | |
| | | int[] qos = new int[length]; |
| | | for (int i = 0; i < length; i++) { |
| | | /** |
| | | * MQTTåè®®ä¸æä¸ç§æ¶æ¯å叿å¡è´¨é: |
| | | * |
| | | * QOS0ï¼ âè³å¤ä¸æ¬¡âï¼æ¶æ¯åå¸å®å
¨ä¾èµåºå± TCP/IP ç½ç»ãä¼åçæ¶æ¯ä¸¢å¤±æéå¤ãè¿ä¸çº§å«å¯ç¨äºå¦ä¸æ
åµï¼ç¯å¢ä¼ æå¨æ°æ®ï¼ä¸¢å¤±ä¸æ¬¡è¯»è®°å½æ æè°ï¼å 为ä¸ä¹
åè¿ä¼æç¬¬äºæ¬¡åéã |
| | | * QOS1ï¼ âè³å°ä¸æ¬¡âï¼ç¡®ä¿æ¶æ¯å°è¾¾ï¼ä½æ¶æ¯éå¤å¯è½ä¼åçã |
| | | * QOS2ï¼ âåªæä¸æ¬¡âï¼ç¡®ä¿æ¶æ¯å°è¾¾ä¸æ¬¡ãè¿ä¸çº§å«å¯ç¨äºå¦ä¸æ
åµï¼å¨è®¡è´¹ç³»ç»ä¸ï¼æ¶æ¯é夿䏢失ä¼å¯¼è´ä¸æ£ç¡®çç»æï¼èµæºå¼é大 |
| | | */ |
| | | qos[i] = 1; |
| | | } |
| | | log.debug("--è®¾ç½®æ¶æ¯åå¸è´¨é"); |
| | | return qos; |
| | | } |
| | | |
| | | /** |
| | | * è£
è½½åç§å®ä¾å订é
ä¸»é¢ --- 4 --- |
| | | * è¿æ¥å¹¶è£
è½½åè° --- 3 --- |
| | | */ |
| | | public void create(MqttConnectOptions options, String[] topic, int[] qos) { |
| | | public void create(MqttConnectOptions options) { |
| | | try { |
| | | client.setCallback(new MqttConsumerCallback(client, options, topic, qos)); |
| | | client.setCallback(new MqttProducerCallback(client, options)); |
| | | log.debug("--æ·»å åè°å¤çç±»"); |
| | | client.connect(options); |
| | | } catch (Exception e) { |
| | | log.info("è£
è½½å®ä¾æè®¢é
主é¢å¼å¸¸ï¼" + e); |
| | | log.info("è¿æ¥å¹¶è£
è½½åè°å¼å¸¸ï¼" + e); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 订é
æä¸ªä¸»é¢ |
| | | * |
| | | * @param topic |
| | | * @param qos |
| | | */ |
| | | public void subscribe(String topic, int qos) { |
| | | try { |
| | | log.debug("topic:" + topic); |
| | | client.subscribe(topic, qos); |
| | | } catch (MqttException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * åå¸ï¼éæä¹
å |
| | | * <p> |
| | | * qosæ ¹æ®ææ¡£è®¾ç½®ä¸º1 |
| | | * qosæ ¹æ®ææ¡£è®¾ç½®ä¸º2 |
| | | * |
| | | * @param topic |
| | | * @param msg |
| | | */ |
| | | public static void publish(String topic, String msg) { |
| | | publish(1, false, topic, msg); |
| | | publish(2, false, topic, msg); |
| | | } |
| | | |
| | | /** |