‘liusuyi’
2023-12-05 745d5b90be7d8cdb8873b18d18b286fdc4b6913b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package com.ard.utils.mqtt;
 
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
 
import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;
 
/**
 * @Description: mqtt生产客户端
 * @ClassName: MqttProducer
 * @Author: 刘苏义
 * @Date: 2023年05月29日9:55
 * @Version: 1.0
 **/
@Component
@Slf4j(topic = "mqtt")
@Order(1)
public class MqttProducer implements ApplicationRunner {
    @Resource
    MqttConfiguration mqttConfig;
 
    private static MqttClient client;
 
    @Override
    public void run(ApplicationArguments args) {
        log.debug("初始化并启动mqtt......");
        if (mqttConfig.getEnabled()) {
            this.connect();
        }
    }
 
    /**
     * 连接mqtt服务器
     */
    private void connect() {
        try {
            // 1 创建客户端
            getClient();
            // 2 设置配置
            MqttConnectOptions options = getOptions();
            options.setMaxInflight(1000);
            // 3 最后设置
            create(options);
        } catch (Exception e) {
            log.error("mqtt连接异常:" + e);
        }
    }
 
    /**
     * 创建客户端  --- 1 ---
     */
    public void getClient() {
        try {
            if (null == client) {
                client = new MqttClient(mqttConfig.getHost(), mqttConfig.getClientId(), new MemoryPersistence());
            }
            log.debug("--创建mqtt客户端");
        } catch (Exception e) {
            log.error("创建mqtt客户端异常:" + e);
        }
    }
 
    /**
     * 生成配置对象,用户名,密码等  --- 2 ---
     */
    public MqttConnectOptions getOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        //设置用户名密码
        options.setUserName(mqttConfig.getUsername());
        options.setPassword(mqttConfig.getPassword().toCharArray());
        // 设置超时时间
        options.setConnectionTimeout(mqttConfig.getTimeout());
        // 设置会话心跳时间
        options.setKeepAliveInterval(mqttConfig.getKeepalive());
        // 是否清除session
        options.setCleanSession(false);
        log.debug("--生成mqtt配置对象");
        return options;
    }
 
 
    /**
     * 连接并装载回调 --- 3 ---
     */
    public void create(MqttConnectOptions options) {
        try {
            client.setCallback(new MqttProducerCallback(client, options));
            log.debug("--添加回调处理类");
            client.connect(options);
        } catch (Exception e) {
            log.info("连接并装载回调异常:" + e);
        }
    }
 
 
 
    /**
     * 发布,非持久化
     * <p>
     * qos根据文档设置为2
     *
     * @param topic
     * @param msg
     */
    public static void publish(String topic, String msg) {
        publish(2, false, topic, msg);
    }
 
    /**
     * 发布
     */
    public static void publish(int qos, boolean retained, String topic, String pushMessage) {
        if (client != null) {
            log.debug("【主题】:" + topic + "【qos】:" + qos + "【pushMessage】:" + pushMessage);
            MqttMessage message = new MqttMessage();
            message.setQos(qos);
            message.setRetained(retained);
            try {
                message.setPayload(pushMessage.getBytes("UTF-8"));
            } catch (UnsupportedEncodingException e) {
                log.error("mqtt编码异常:" + e.getMessage());
            }
            MqttTopic mTopic = client.getTopic(topic);
            if (null == mTopic) {
                log.error("topic:" + topic + " 不存在");
            }
            MqttDeliveryToken token;
            try {
                token = mTopic.publish(message);
                token.waitForCompletion();
                if (token.isComplete()) {
                    log.debug("消息发送成功");
                }
            } catch (MqttPersistenceException e) {
                log.error("mqtt持久异常:" + e.getMessage());
            } catch (MqttException e) {
                log.error("mqtt异常:" + e.getMessage());
            }
        }
    }
}