package com.ruoyi.utils.qymqtt.oldM; import com.ruoyi.common.utils.spring.SpringUtils; import com.ruoyi.sy.service.ArdSyCarRtuService; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; /** * mqtt客户端 */ @Slf4j @Component public class MqttCustomerClient { private static MqttClient client; private static MqttConnectOptions options; private PushCallback pushCallback; public static MqttClient getClient(){ return client; } public static void setClient(MqttClient client){ MqttCustomerClient.client=client; } public static MqttConnectOptions getOptions() { return options; } public static void setOptions(MqttConnectOptions options) { MqttCustomerClient.options = options; } /** * 客户端连接 * * @param clientID 客户端Id */ public void connect(String clientID){ MqttClient client; try { ArdSyCarRtuService carRtuService = SpringUtils.getBean(ArdSyCarRtuService.class); //获取电磁锁地址 String mqttUrl = carRtuService.url("ElectromagneticLockUrl"); client=new MqttClient(mqttUrl,clientID,new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); options.setUserName("admin"); options.setPassword("xzx12345".toCharArray()); //options.setConnectionTimeout(100); //options.setKeepAliveInterval(60); options.setAutomaticReconnect(true); MqttCustomerClient.setClient(client); MqttCustomerClient.setOptions(options); try { client.setCallback(new PushCallback()); client.connect(options); }catch (Exception e){ e.printStackTrace(); } }catch (Exception e){ e.printStackTrace(); } } /** * 发布 * @param topic * @param pushMessage */ public void pushlish(String topic,String pushMessage) throws MqttException { pushlish(2,false,topic,pushMessage); } /** * 发布 * * @param qos 连接方式 * @param retained 是否保留 * @param topic 主题 * @param pushMessage 消息体 */ public void pushlish(int qos,boolean retained,String topic,String pushMessage) throws MqttException { MqttMessage message=new MqttMessage(); message.setQos(qos); message.setRetained(retained); message.setPayload(pushMessage.getBytes()); try{ client.publish(topic,message); }catch (Exception e){ log.error(topic+"发布失败!"); } client.disconnect(); } /** * 订阅某个主题 * @param topic */ public void subscribe(String topic){ log.info("开始订阅主题" + topic); subscribe(topic,2); } public void subscribe(String topic,int qos){ try { MqttCustomerClient.getClient().subscribe(topic,qos); }catch (MqttException e){ e.printStackTrace(); } } //取消订阅 public void unSubscribe(String topicFilter){ try { client.unsubscribe(topicFilter); } catch (MqttException e) { System.out.println("取消订阅主题"+topicFilter+"失败"); e.getMessage(); } } //断开连接 @PreDestroy//该注解对象销毁后触发 public void disConnect(){ try { client.disconnect(); } catch (MqttException e) { System.out.println("断开连接产生异常"); e.getMessage(); } } }