‘liusuyi’
2023-10-08 157c26f5188c7ed62a4547f7e3b5a5a3e3ed7729
src/main/java/com/ard/utils/mqtt/MqttProducer.java
ÎļþÃû´Ó src/main/java/com/ard/utils/mqtt/MqttConsumer.java ÐÞ¸Ä
@@ -1,21 +1,19 @@
package com.ard.utils.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.expression.spel.ast.NullLiteral;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
/**
 * @Description: mqtt消费客户端
 * @ClassName: MqttConsumer
 * @Description: mqtt生产客户端
 * @ClassName: MqttProducer
 * @Author: åˆ˜è‹ä¹‰
 * @Date: 2023å¹´05月29日9:55
 * @Version: 1.0
@@ -23,11 +21,9 @@
@Component
@Slf4j(topic = "mqtt")
@Order(1)
public class MqttConsumer implements ApplicationRunner {
public class MqttProducer implements ApplicationRunner {
    @Value("${spring.mqtt.enabled}")
    private Boolean MQTT_ENABLED;
    @Value("${spring.mqtt.topic}")
    private String MQTT_TOPIC;
    @Value("${spring.mqtt.host}")
    private String MQTT_HOST;
    @Value("${spring.mqtt.clientId}")
@@ -60,11 +56,8 @@
            getClient();
            // 2 è®¾ç½®é…ç½®
            MqttConnectOptions options = getOptions();
            String[] topic = MQTT_TOPIC.split(",");
            // 3 æ¶ˆæ¯å‘布质量
            int[] qos = getQos(topic.length);
            // 4 æœ€åŽè®¾ç½®
            create(options, topic, qos);
            // 3 æœ€åŽè®¾ç½®
            create(options);
        } catch (Exception e) {
            log.error("mqtt连接异常:" + e);
        }
@@ -97,69 +90,37 @@
        // è®¾ç½®ä¼šè¯å¿ƒè·³æ—¶é—´
        options.setKeepAliveInterval(MQTT_KEEP_ALIVE);
        // æ˜¯å¦æ¸…除session
        options.setCleanSession(true);
        options.setCleanSession(false);
        log.debug("--生成mqtt配置对象");
        return options;
    }
    /**
     * qos   --- 3 ---
     */
    public int[] getQos(int length) {
        int[] qos = new int[length];
        for (int i = 0; i < length; i++) {
            /**
             *  MQTT协议中有三种消息发布服务质量:
             *
             * QOS0: â€œè‡³å¤šä¸€æ¬¡â€ï¼Œæ¶ˆæ¯å‘布完全依赖底层 TCP/IP ç½‘络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
             * QOS1: â€œè‡³å°‘一次”,确保消息到达,但消息重复可能会发生。
             * QOS2: â€œåªæœ‰ä¸€æ¬¡â€ï¼Œç¡®ä¿æ¶ˆæ¯åˆ°è¾¾ä¸€æ¬¡ã€‚这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果,资源开销大
             */
            qos[i] = 1;
        }
        log.debug("--设置消息发布质量");
        return qos;
    }
    /**
     * è£…载各种实例和订阅主题  --- 4 ---
     * è¿žæŽ¥å¹¶è£…载回调 --- 3 ---
     */
    public void create(MqttConnectOptions options, String[] topic, int[] qos) {
    public void create(MqttConnectOptions options) {
        try {
            client.setCallback(new MqttConsumerCallback(client, options, topic, qos));
            client.setCallback(new MqttProducerCallback(client, options));
            log.debug("--添加回调处理类");
            client.connect(options);
        } catch (Exception e) {
            log.info("装载实例或订阅主题异常:" + e);
            log.info("连接并装载回调异常:" + e);
        }
    }
    /**
     * è®¢é˜…某个主题
     *
     * @param topic
     * @param qos
     */
    public void subscribe(String topic, int qos) {
        try {
            log.debug("topic:" + topic);
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    /**
     * å‘布,非持久化
     * <p>
     * qos根据文档设置为1
     * qos根据文档设置为2
     *
     * @param topic
     * @param msg
     */
    public static void publish(String topic, String msg) {
        publish(1, false, topic, msg);
        publish(2, false, topic, msg);
    }
    /**