‘liusuyi’
2023-07-07 85446b5b526ac53af9add7c83cfd72f39ec39611
src/main/java/com/ard/utils/mqtt/MqttConsumer.java
@@ -1,12 +1,17 @@
package com.ard.utils.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.expression.spel.ast.NullLiteral;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
/**
 * @Description: mqtt消费客户端
 * @ClassName: MqttConsumer
@@ -16,6 +21,7 @@
 **/
@Component
@Slf4j(topic = "mqtt")
@Order(1)
public class MqttConsumer implements ApplicationRunner {
    private static MqttClient client;
@@ -23,8 +29,7 @@
    @Override
    public void run(ApplicationArguments args) {
        log.info("初始化并启动mqtt......");
        if(PropertiesUtil.MQTT_ENABLED)
        {
        if (PropertiesUtil.MQTT_ENABLED) {
            this.connect();
        }
    }
@@ -49,7 +54,7 @@
    }
    /**
     *  创建客户端  --- 1 ---
     * 创建客户端  --- 1 ---
     */
    public void getClient() {
        try {
@@ -63,7 +68,7 @@
    }
    /**
     *  生成配置对象,用户名,密码等  --- 2 ---
     * 生成配置对象,用户名,密码等  --- 2 ---
     */
    public MqttConnectOptions getOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
@@ -81,7 +86,7 @@
    }
    /**
     *  qos   --- 3 ---
     * qos   --- 3 ---
     */
    public int[] getQos(int length) {
@@ -101,7 +106,7 @@
    }
    /**
     *  装载各种实例和订阅主题  --- 4 ---
     * 装载各种实例和订阅主题  --- 4 ---
     */
    public void create(MqttConnectOptions options, String[] topic, int[] qos) {
        try {
@@ -130,8 +135,8 @@
    /**
     * 发布,非持久化
     *
     *  qos根据文档设置为1
     * <p>
     * qos根据文档设置为1
     *
     * @param topic
     * @param msg
@@ -144,30 +149,32 @@
     * 发布
     */
    public static void publish(int qos, boolean retained, String topic, String pushMessage) {
        log.info("【主题】:" + topic + "【qos】:" + qos + "【pushMessage】:" + pushMessage);
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        try {
            message.setPayload(pushMessage.getBytes("UTF-8"));
        } catch (UnsupportedEncodingException e) {
            log.error("mqtt编码异常:" + e.getMessage());
        }
        MqttTopic mTopic = client.getTopic(topic);
        if (null == mTopic) {
            log.error("topic:" + topic + " 不存在");
        }
        MqttDeliveryToken token;
        try {
            token = mTopic.publish(message);
            token.waitForCompletion();
            if (token.isComplete()) {
                log.info("消息发送成功");
        if (client != null) {
            log.info("【主题】:" + topic + "【qos】:" + qos + "【pushMessage】:" + pushMessage);
            MqttMessage message = new MqttMessage();
            message.setQos(qos);
            message.setRetained(retained);
            try {
                message.setPayload(pushMessage.getBytes("UTF-8"));
            } catch (UnsupportedEncodingException e) {
                log.error("mqtt编码异常:" + e.getMessage());
            }
        } catch (MqttPersistenceException e) {
            e.printStackTrace();
        } catch (MqttException e) {
            e.printStackTrace();
            MqttTopic mTopic = client.getTopic(topic);
            if (null == mTopic) {
                log.error("topic:" + topic + " 不存在");
            }
            MqttDeliveryToken token;
            try {
                token = mTopic.publish(message);
                token.waitForCompletion();
                if (token.isComplete()) {
                    log.info("消息发送成功");
                }
            } catch (MqttPersistenceException e) {
                log.error("mqtt持久异常:" + e.getMessage());
            } catch (MqttException e) {
                log.error("mqtt异常:" + e.getMessage());
            }
        }
    }
}