package com.ruoyi.utils.qymqtt.newM; import javax.annotation.PreDestroy; import com.ruoyi.common.utils.spring.SpringUtils; import com.ruoyi.sy.service.ArdSyCarRtuService; import org.eclipse.paho.client.mqttv3.IMqttClient; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttClientPersistence; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class EmqClient { private IMqttClient mqttClient; private MessageCallback messageCallback; public EmqClient(String clientId){ ArdSyCarRtuService carRtuService = SpringUtils.getBean(ArdSyCarRtuService.class); //获取电磁锁地址 String mqttUrl = carRtuService.url("ElectromagneticLockUrl"); MqttClientPersistence mempersitence = new MemoryPersistence();//持久化方式 try { this.mqttClient = new MqttClient(mqttUrl,clientId,mempersitence);//设置连接broker的host及clientId this.messageCallback = new MessageCallback(); } catch (MqttException e) { e.getMessage(); } } //连接broker public void connect(){ MqttConnectOptions options = new MqttConnectOptions(); options.setAutomaticReconnect(true); options.setUserName("admin"); options.setPassword("xzx12345".toCharArray()); options.setCleanSession(true);//设置为临时会话 mqttClient.setCallback(messageCallback);//设置回调 try { mqttClient.connect(options); } catch (MqttException e) { System.out.println("mqtt客户端连接服务端失败"); e.getMessage(); } } //断开连接 @PreDestroy//该注解对象销毁后触发 public void disConnect(){ try { mqttClient.disconnect(); } catch (MqttException e) { System.out.println("断开连接产生异常"); e.getMessage(); } } //重连 public void reConnect(){ try { mqttClient.reconnect(); } catch (MqttException e) { System.out.println("重连失败,失败原因"); e.getMessage(); } } //发布消息 传参为主题,消息,等级,是否保留消息 public void publish(String topic, String msg, QosEnum qos, boolean retain){ MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setPayload(msg.getBytes()); mqttMessage.setQos(qos.value()); mqttMessage.setRetained(retain); try { mqttClient.publish(topic,mqttMessage); mqttClient.disconnect(); } catch (MqttException e) { System.out.println("发布消息失败"); System.out.println("主题:"+topic); System.out.println("消息:"+msg); System.out.println("qos等级:"+qos.value()); e.getMessage(); } } //订阅 public void subscribe(String topicFilter,QosEnum qos){ try { mqttClient.subscribe(topicFilter,qos.value()); } catch (MqttException e) { System.out.println("订阅主题"+topicFilter+"失败"); System.out.println("qos等级:"+qos.value()); e.getMessage(); } } //取消订阅 public void unSubscribe(String topicFilter){ try { mqttClient.unsubscribe(topicFilter); } catch (MqttException e) { System.out.println("取消订阅主题"+topicFilter+"失败"); e.getMessage(); } } }