re
aijinhui
2023-11-23 eb204e59b0d362a44b04783f76d397380fe77ed9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package com.ruoyi.utils.qymqtt;
 
import com.ruoyi.common.utils.spring.SpringUtils;
import com.ruoyi.sy.service.ArdSyCarRtuService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
import javax.annotation.PreDestroy;
 
/**
 *  mqtt客户端
 */
@Slf4j
@Component
public class MqttCustomerClient {
 
    private static MqttClient client;
 
    private static MqttConnectOptions options;
 
    private PushCallback pushCallback;
 
    public  static MqttClient getClient(){
        return  client;
    }
 
    public static void setClient(MqttClient client){
        MqttCustomerClient.client=client;
    }
 
    public static MqttConnectOptions getOptions() {
        return options;
    }
 
    public static void setOptions(MqttConnectOptions options) {
        MqttCustomerClient.options = options;
    }
 
    /**
     * 客户端连接
     *
     * @param clientID  客户端Id
     */
    public void connect(String clientID){
        MqttClient client;
        try {
            ArdSyCarRtuService carRtuService = SpringUtils.getBean(ArdSyCarRtuService.class);
            //获取电磁锁地址
            String mqttUrl = carRtuService.url("ElectromagneticLockUrl");
            client=new MqttClient(mqttUrl,clientID,new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setUserName("admin");
            options.setPassword("xzx12345".toCharArray());
            //options.setConnectionTimeout(100);
            //options.setKeepAliveInterval(60);
            options.setAutomaticReconnect(true);
            MqttCustomerClient.setClient(client);
            MqttCustomerClient.setOptions(options);
            try {
                client.setCallback(new PushCallback());
                client.connect(options);
            }catch (Exception e){
                e.printStackTrace();
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
 
    /**
     * 发布
     * @param topic
     * @param pushMessage
     */
    public void pushlish(String topic,String pushMessage) throws MqttException {
        pushlish(2,false,topic,pushMessage);
    }
 
    /**
     * 发布
     *
     * @param qos         连接方式
     * @param retained    是否保留
     * @param topic       主题
     * @param pushMessage 消息体
     */
    public void pushlish(int qos,boolean retained,String topic,String pushMessage) throws MqttException {
        MqttMessage message=new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        try{
            client.publish(topic,message);
        }catch (Exception e){
            log.error(topic+"发布失败!");
        }
        client.disconnect();
    }
 
    /**
     * 订阅某个主题
     * @param topic
     */
    public void subscribe(String topic){
        log.info("开始订阅主题" + topic);
        subscribe(topic,2);
    }
 
    public void subscribe(String topic,int qos){
        try {
            MqttCustomerClient.getClient().subscribe(topic,qos);
        }catch (MqttException e){
            e.printStackTrace();
        }
    }
 
    //取消订阅
    public void unSubscribe(String topicFilter){
        try {
            client.unsubscribe(topicFilter);
        } catch (MqttException e) {
            System.out.println("取消订阅主题"+topicFilter+"失败");
            e.getMessage();
        }
    }
 
    //断开连接
    @PreDestroy//该注解对象销毁后触发
    public void disConnect(){
        try {
            client.disconnect();
        } catch (MqttException e) {
            System.out.println("断开连接产生异常");
            e.getMessage();
        }
    }
}