liusuyi
2024-10-10 38f29e38fcc668171dc05c53d40a36b895c86102
ard-work/src/main/java/com/ruoyi/utils/mqtt/MqttConsumer.java
@@ -3,8 +3,10 @@
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
/**
@@ -16,14 +18,33 @@
 **/
@Component
@Slf4j(topic = "mqtt")
@Order(1)
public class MqttConsumer implements ApplicationRunner {
    @Value("${mqtt.enabled}")
    private Boolean MQTT_ENABLED;
    @Value("${mqtt.topic}")
    private String MQTT_TOPIC;
    @Value("${mqtt.host}")
    private String MQTT_HOST;
    @Value("${mqtt.clientId}")
    private String MQTT_CLIENT_ID;
    @Value("${mqtt.username}")
    private String MQTT_USER_NAME;
    @Value("${mqtt.password}")
    private String MQTT_PASSWORD;
    @Value("${mqtt.timeout}")
    private int MQTT_TIMEOUT;
    @Value("${mqtt.keepalive}")
    private int MQTT_KEEP_ALIVE;
    private static MqttClient client;
    @Override
    public void run(ApplicationArguments args) {
        log.info("初始化并启动mqtt......");
        if(PropertiesUtil.MQTT_ENABLED)
        log.debug("初始化并启动mqtt......");
        if(MQTT_ENABLED)
        {
            this.connect();
        }
@@ -38,7 +59,7 @@
            getClient();
            // 2 设置配置
            MqttConnectOptions options = getOptions();
            String[] topic = PropertiesUtil.MQTT_TOPIC.split(",");
            String[] topic = MQTT_TOPIC.split(",");
            // 3 消息发布质量
            int[] qos = getQos(topic.length);
            // 4 最后设置
@@ -54,9 +75,9 @@
    public void getClient() {
        try {
            if (null == client) {
                client = new MqttClient(PropertiesUtil.MQTT_HOST, PropertiesUtil.MQTT_CLIENT_ID, new MemoryPersistence());
                client = new MqttClient(MQTT_HOST, MQTT_CLIENT_ID, new MemoryPersistence());
            }
            log.info("--创建mqtt客户端");
            log.debug("--创建mqtt客户端");
        } catch (Exception e) {
            log.error("创建mqtt客户端异常:" + e);
        }
@@ -68,15 +89,15 @@
    public MqttConnectOptions getOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        //设置用户名密码
        options.setUserName(PropertiesUtil.MQTT_USER_NAME);
        options.setPassword(PropertiesUtil.MQTT_PASSWORD.toCharArray());
        options.setUserName(MQTT_USER_NAME);
        options.setPassword(MQTT_PASSWORD.toCharArray());
        // 设置超时时间
        options.setConnectionTimeout(PropertiesUtil.MQTT_TIMEOUT);
        options.setConnectionTimeout(MQTT_TIMEOUT);
        // 设置会话心跳时间
        options.setKeepAliveInterval(PropertiesUtil.MQTT_KEEP_ALIVE);
        options.setKeepAliveInterval(MQTT_KEEP_ALIVE);
        // 是否清除session
        options.setCleanSession(false);
        log.info("--生成mqtt配置对象");
        options.setCleanSession(true);
        log.debug("--生成mqtt配置对象");
        return options;
    }
@@ -96,7 +117,7 @@
             */
            qos[i] = 1;
        }
        log.info("--设置消息发布质量");
        log.debug("--设置消息发布质量");
        return qos;
    }
@@ -106,10 +127,10 @@
    public void create(MqttConnectOptions options, String[] topic, int[] qos) {
        try {
            client.setCallback(new MqttConsumerCallback(client, options, topic, qos));
            log.info("--添加回调处理类");
            log.debug("--添加回调处理类");
            client.connect(options);
        } catch (Exception e) {
            log.info("装载实例或订阅主题异常:" + e);
            log.error("装载实例或订阅主题异常:" + e);
        }
    }
@@ -121,7 +142,7 @@
     */
    public void subscribe(String topic, int qos) {
        try {
            log.info("topic:" + topic);
            log.debug("topic:" + topic);
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
@@ -144,7 +165,7 @@
     * 发布
     */
    public static void publish(int qos, boolean retained, String topic, String pushMessage) {
        log.info("【主题】:" + topic + "【qos】:" + qos + "【pushMessage】:" + pushMessage);
        log.debug("【主题】:" + topic + "【qos】:" + qos + "【pushMessage】:" + pushMessage);
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
@@ -162,7 +183,7 @@
            token = mTopic.publish(message);
            token.waitForCompletion();
            if (token.isComplete()) {
                log.info("消息发送成功");
                log.debug("消息发送成功");
            }
        } catch (MqttPersistenceException e) {
            e.printStackTrace();