‘liusuyi’
2023-07-05 1fd64b07ddb99c2d9cc8a358b71aceb6a2c81492
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package com.ard.utils.mqtt;
 
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
/**
 * @Description: mqtt消费客户端
 * @ClassName: MqttConsumer
 * @Author: 刘苏义
 * @Date: 2023年05月29日9:55
 * @Version: 1.0
 **/
@Component
@Slf4j(topic = "mqtt")
public class MqttConsumer implements ApplicationRunner {
 
    private static MqttClient client;
 
    @Override
    public void run(ApplicationArguments args) {
        log.info("初始化并启动mqtt......");
        if(PropertiesUtil.MQTT_ENABLED)
        {
            this.connect();
        }
    }
 
    /**
     * 连接mqtt服务器
     */
    private void connect() {
        try {
            // 1 创建客户端
            getClient();
            // 2 设置配置
            MqttConnectOptions options = getOptions();
            String[] topic = PropertiesUtil.MQTT_TOPIC.split(",");
            // 3 消息发布质量
            int[] qos = getQos(topic.length);
            // 4 最后设置
            create(options, topic, qos);
        } catch (Exception e) {
            log.error("mqtt连接异常:" + e);
        }
    }
 
    /**
     *  创建客户端  --- 1 ---
     */
    public void getClient() {
        try {
            if (null == client) {
                client = new MqttClient(PropertiesUtil.MQTT_HOST, PropertiesUtil.MQTT_CLIENT_ID, new MemoryPersistence());
            }
            log.info("--创建mqtt客户端");
        } catch (Exception e) {
            log.error("创建mqtt客户端异常:" + e);
        }
    }
 
    /**
     *  生成配置对象,用户名,密码等  --- 2 ---
     */
    public MqttConnectOptions getOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        //设置用户名密码
        options.setUserName(PropertiesUtil.MQTT_USER_NAME);
        options.setPassword(PropertiesUtil.MQTT_PASSWORD.toCharArray());
        // 设置超时时间
        options.setConnectionTimeout(PropertiesUtil.MQTT_TIMEOUT);
        // 设置会话心跳时间
        options.setKeepAliveInterval(PropertiesUtil.MQTT_KEEP_ALIVE);
        // 是否清除session
        options.setCleanSession(true);
        log.info("--生成mqtt配置对象");
        return options;
    }
 
    /**
     *  qos   --- 3 ---
     */
    public int[] getQos(int length) {
 
        int[] qos = new int[length];
        for (int i = 0; i < length; i++) {
            /**
             *  MQTT协议中有三种消息发布服务质量:
             *
             * QOS0: “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
             * QOS1: “至少一次”,确保消息到达,但消息重复可能会发生。
             * QOS2: “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果,资源开销大
             */
            qos[i] = 1;
        }
        log.info("--设置消息发布质量");
        return qos;
    }
 
    /**
     *  装载各种实例和订阅主题  --- 4 ---
     */
    public void create(MqttConnectOptions options, String[] topic, int[] qos) {
        try {
            client.setCallback(new MqttConsumerCallback(client, options, topic, qos));
            log.info("--添加回调处理类");
            client.connect(options);
        } catch (Exception e) {
            log.info("装载实例或订阅主题异常:" + e);
        }
    }
 
    /**
     * 订阅某个主题
     *
     * @param topic
     * @param qos
     */
    public void subscribe(String topic, int qos) {
        try {
            log.info("topic:" + topic);
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
 
    /**
     * 发布,非持久化
     *
     *  qos根据文档设置为1
     *
     * @param topic
     * @param msg
     */
    public static void publish(String topic, String msg) {
        publish(1, false, topic, msg);
    }
 
    /**
     * 发布
     */
    public static void publish(int qos, boolean retained, String topic, String pushMessage) {
        log.info("【主题】:" + topic + "【qos】:" + qos + "【pushMessage】:" + pushMessage);
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        try {
            message.setPayload(pushMessage.getBytes("UTF-8"));
        } catch (UnsupportedEncodingException e) {
            log.error("mqtt编码异常:" + e.getMessage());
        }
        MqttTopic mTopic = client.getTopic(topic);
        if (null == mTopic) {
            log.error("topic:" + topic + " 不存在");
        }
        MqttDeliveryToken token;
        try {
            token = mTopic.publish(message);
            token.waitForCompletion();
            if (token.isComplete()) {
                log.info("消息发送成功");
            }
        } catch (MqttPersistenceException e) {
            log.error("mqtt持久异常:" + e.getMessage());
        } catch (MqttException e) {
            log.error("mqtt异常:" + e.getMessage());
        }
    }
}