package com.ruoyi.utils.qymqtt;
|
|
import com.ruoyi.common.utils.spring.SpringUtils;
|
import com.ruoyi.sy.service.ArdSyCarRtuService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.eclipse.paho.client.mqttv3.*;
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Component;
|
|
import javax.annotation.PreDestroy;
|
|
/**
|
* mqtt客户端
|
*/
|
@Slf4j
|
@Component
|
public class MqttCustomerClient {
|
|
private static MqttClient client;
|
|
private static MqttConnectOptions options;
|
|
private PushCallback pushCallback;
|
|
public static MqttClient getClient(){
|
return client;
|
}
|
|
public static void setClient(MqttClient client){
|
MqttCustomerClient.client=client;
|
}
|
|
public static MqttConnectOptions getOptions() {
|
return options;
|
}
|
|
public static void setOptions(MqttConnectOptions options) {
|
MqttCustomerClient.options = options;
|
}
|
|
/**
|
* 客户端连接
|
*
|
* @param clientID 客户端Id
|
*/
|
public void connect(String clientID){
|
MqttClient client;
|
try {
|
ArdSyCarRtuService carRtuService = SpringUtils.getBean(ArdSyCarRtuService.class);
|
//获取电磁锁地址
|
String mqttUrl = carRtuService.url("ElectromagneticLockUrl");
|
client=new MqttClient(mqttUrl,clientID,new MemoryPersistence());
|
MqttConnectOptions options = new MqttConnectOptions();
|
options.setCleanSession(true);
|
options.setUserName("admin");
|
options.setPassword("xzx12345".toCharArray());
|
//options.setConnectionTimeout(100);
|
//options.setKeepAliveInterval(60);
|
options.setAutomaticReconnect(true);
|
MqttCustomerClient.setClient(client);
|
MqttCustomerClient.setOptions(options);
|
try {
|
client.setCallback(new PushCallback());
|
client.connect(options);
|
}catch (Exception e){
|
e.printStackTrace();
|
}
|
}catch (Exception e){
|
e.printStackTrace();
|
}
|
}
|
|
/**
|
* 发布
|
* @param topic
|
* @param pushMessage
|
*/
|
public void pushlish(String topic,String pushMessage) throws MqttException {
|
pushlish(2,false,topic,pushMessage);
|
}
|
|
/**
|
* 发布
|
*
|
* @param qos 连接方式
|
* @param retained 是否保留
|
* @param topic 主题
|
* @param pushMessage 消息体
|
*/
|
public void pushlish(int qos,boolean retained,String topic,String pushMessage) throws MqttException {
|
MqttMessage message=new MqttMessage();
|
message.setQos(qos);
|
message.setRetained(retained);
|
message.setPayload(pushMessage.getBytes());
|
try{
|
client.publish(topic,message);
|
}catch (Exception e){
|
log.error(topic+"发布失败!");
|
}
|
client.disconnect();
|
}
|
|
/**
|
* 订阅某个主题
|
* @param topic
|
*/
|
public void subscribe(String topic){
|
log.info("开始订阅主题" + topic);
|
subscribe(topic,2);
|
}
|
|
public void subscribe(String topic,int qos){
|
try {
|
MqttCustomerClient.getClient().subscribe(topic,qos);
|
}catch (MqttException e){
|
e.printStackTrace();
|
}
|
}
|
|
//取消订阅
|
public void unSubscribe(String topicFilter){
|
try {
|
client.unsubscribe(topicFilter);
|
} catch (MqttException e) {
|
System.out.println("取消订阅主题"+topicFilter+"失败");
|
e.getMessage();
|
}
|
}
|
|
//断开连接
|
@PreDestroy//该注解对象销毁后触发
|
public void disConnect(){
|
try {
|
client.disconnect();
|
} catch (MqttException e) {
|
System.out.println("断开连接产生异常");
|
e.getMessage();
|
}
|
}
|
}
|