package com.ard.utils.mqtt; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; import java.io.UnsupportedEncodingException; /** * @Description: mqtt消费客户端 * @ClassName: MqttConsumer * @Author: 刘苏义 * @Date: 2023年05月29日9:55 * @Version: 1.0 **/ @Component @Slf4j(topic = "mqtt") public class MqttConsumer implements ApplicationRunner { private static MqttClient client; @Override public void run(ApplicationArguments args) { log.info("初始化并启动mqtt......"); if(PropertiesUtil.MQTT_ENABLED) { this.connect(); } } /** * 连接mqtt服务器 */ private void connect() { try { // 1 创建客户端 getClient(); // 2 设置配置 MqttConnectOptions options = getOptions(); String[] topic = PropertiesUtil.MQTT_TOPIC.split(","); // 3 消息发布质量 int[] qos = getQos(topic.length); // 4 最后设置 create(options, topic, qos); } catch (Exception e) { log.error("mqtt连接异常:" + e); } } /** * 创建客户端 --- 1 --- */ public void getClient() { try { if (null == client) { client = new MqttClient(PropertiesUtil.MQTT_HOST, PropertiesUtil.MQTT_CLIENT_ID, new MemoryPersistence()); } log.info("--创建mqtt客户端"); } catch (Exception e) { log.error("创建mqtt客户端异常:" + e); } } /** * 生成配置对象,用户名,密码等 --- 2 --- */ public MqttConnectOptions getOptions() { MqttConnectOptions options = new MqttConnectOptions(); //设置用户名密码 options.setUserName(PropertiesUtil.MQTT_USER_NAME); options.setPassword(PropertiesUtil.MQTT_PASSWORD.toCharArray()); // 设置超时时间 options.setConnectionTimeout(PropertiesUtil.MQTT_TIMEOUT); // 设置会话心跳时间 options.setKeepAliveInterval(PropertiesUtil.MQTT_KEEP_ALIVE); // 是否清除session options.setCleanSession(true); log.info("--生成mqtt配置对象"); return options; } /** * qos --- 3 --- */ public int[] getQos(int length) { int[] qos = new int[length]; for (int i = 0; i < length; i++) { /** * MQTT协议中有三种消息发布服务质量: * * QOS0: “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。 * QOS1: “至少一次”,确保消息到达,但消息重复可能会发生。 * QOS2: “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果,资源开销大 */ qos[i] = 1; } log.info("--设置消息发布质量"); return qos; } /** * 装载各种实例和订阅主题 --- 4 --- */ public void create(MqttConnectOptions options, String[] topic, int[] qos) { try { client.setCallback(new MqttConsumerCallback(client, options, topic, qos)); log.info("--添加回调处理类"); client.connect(options); } catch (Exception e) { log.info("装载实例或订阅主题异常:" + e); } } /** * 订阅某个主题 * * @param topic * @param qos */ public void subscribe(String topic, int qos) { try { log.info("topic:" + topic); client.subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } /** * 发布,非持久化 * * qos根据文档设置为1 * * @param topic * @param msg */ public static void publish(String topic, String msg) { publish(1, false, topic, msg); } /** * 发布 */ public static void publish(int qos, boolean retained, String topic, String pushMessage) { log.info("【主题】:" + topic + "【qos】:" + qos + "【pushMessage】:" + pushMessage); MqttMessage message = new MqttMessage(); message.setQos(qos); message.setRetained(retained); try { message.setPayload(pushMessage.getBytes("UTF-8")); } catch (UnsupportedEncodingException e) { log.error("mqtt编码异常:" + e.getMessage()); } MqttTopic mTopic = client.getTopic(topic); if (null == mTopic) { log.error("topic:" + topic + " 不存在"); } MqttDeliveryToken token; try { token = mTopic.publish(message); token.waitForCompletion(); if (token.isComplete()) { log.info("消息发送成功"); } } catch (MqttPersistenceException e) { log.error("mqtt持久异常:" + e.getMessage()); } catch (MqttException e) { log.error("mqtt异常:" + e.getMessage()); } } }