package com.ard.utils.mqtt; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import java.io.UnsupportedEncodingException; /** * @Description: mqtt生产客户端 * @ClassName: MqttProducer * @Author: 刘苏义 * @Date: 2023年05月29日9:55 * @Version: 1.0 **/ @Component @Slf4j(topic = "mqtt") @Order(1) public class MqttProducer implements ApplicationRunner { @Value("${spring.mqtt.enabled}") private Boolean MQTT_ENABLED; @Value("${spring.mqtt.host}") private String MQTT_HOST; @Value("${spring.mqtt.clientId}") private String MQTT_CLIENT_ID; @Value("${spring.mqtt.username}") private String MQTT_USER_NAME; @Value("${spring.mqtt.password}") private String MQTT_PASSWORD; @Value("${spring.mqtt.timeout}") private int MQTT_TIMEOUT; @Value("${spring.mqtt.keepalive}") private int MQTT_KEEP_ALIVE; private static MqttClient client; @Override public void run(ApplicationArguments args) { log.debug("初始化并启动mqtt......"); if (MQTT_ENABLED) { this.connect(); } } /** * 连接mqtt服务器 */ private void connect() { try { // 1 创建客户端 getClient(); // 2 设置配置 MqttConnectOptions options = getOptions(); options.setMaxInflight(1000); // 3 最后设置 create(options); } catch (Exception e) { log.error("mqtt连接异常:" + e); } } /** * 创建客户端 --- 1 --- */ public void getClient() { try { if (null == client) { client = new MqttClient(MQTT_HOST, MQTT_CLIENT_ID, new MemoryPersistence()); } log.debug("--创建mqtt客户端"); } catch (Exception e) { log.error("创建mqtt客户端异常:" + e); } } /** * 生成配置对象,用户名,密码等 --- 2 --- */ public MqttConnectOptions getOptions() { MqttConnectOptions options = new MqttConnectOptions(); //设置用户名密码 options.setUserName(MQTT_USER_NAME); options.setPassword(MQTT_PASSWORD.toCharArray()); // 设置超时时间 options.setConnectionTimeout(MQTT_TIMEOUT); // 设置会话心跳时间 options.setKeepAliveInterval(MQTT_KEEP_ALIVE); // 是否清除session options.setCleanSession(false); log.debug("--生成mqtt配置对象"); return options; } /** * 连接并装载回调 --- 3 --- */ public void create(MqttConnectOptions options) { try { client.setCallback(new MqttProducerCallback(client, options)); log.debug("--添加回调处理类"); client.connect(options); } catch (Exception e) { log.info("连接并装载回调异常:" + e); } } /** * 发布,非持久化 *
* qos根据文档设置为2 * * @param topic * @param msg */ public static void publish(String topic, String msg) { publish(2, false, topic, msg); } /** * 发布 */ public static void publish(int qos, boolean retained, String topic, String pushMessage) { if (client != null) { log.debug("【主题】:" + topic + "【qos】:" + qos + "【pushMessage】:" + pushMessage); MqttMessage message = new MqttMessage(); message.setQos(qos); message.setRetained(retained); try { message.setPayload(pushMessage.getBytes("UTF-8")); } catch (UnsupportedEncodingException e) { log.error("mqtt编码异常:" + e.getMessage()); } MqttTopic mTopic = client.getTopic(topic); if (null == mTopic) { log.error("topic:" + topic + " 不存在"); } MqttDeliveryToken token; try { token = mTopic.publish(message); token.waitForCompletion(); if (token.isComplete()) { log.debug("消息发送成功"); } } catch (MqttPersistenceException e) { log.error("mqtt持久异常:" + e.getMessage()); } catch (MqttException e) { log.error("mqtt异常:" + e.getMessage()); } } } }