| | |
| | | package com.ard.utils.mqtt; |
| | | |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.apache.commons.lang3.StringUtils; |
| | | import org.eclipse.paho.client.mqttv3.*; |
| | | import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; |
| | | import org.springframework.boot.ApplicationArguments; |
| | | import org.springframework.boot.ApplicationRunner; |
| | | import org.springframework.core.annotation.Order; |
| | | import org.springframework.expression.spel.ast.NullLiteral; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import java.io.UnsupportedEncodingException; |
| | | |
| | | /** |
| | | * @Description: mqtt消费客户端 |
| | | * @ClassName: MqttConsumer |
| | |
| | | **/ |
| | | @Component |
| | | @Slf4j(topic = "mqtt") |
| | | @Order(1) |
| | | public class MqttConsumer implements ApplicationRunner { |
| | | |
| | | private static MqttClient client; |
| | |
| | | @Override |
| | | public void run(ApplicationArguments args) { |
| | | log.info("初始化并启动mqtt......"); |
| | | if(PropertiesUtil.MQTT_ENABLED) |
| | | { |
| | | if (PropertiesUtil.MQTT_ENABLED) { |
| | | this.connect(); |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * 创建客户端 --- 1 --- |
| | | * 创建客户端 --- 1 --- |
| | | */ |
| | | public void getClient() { |
| | | try { |
| | |
| | | } |
| | | |
| | | /** |
| | | * 生成配置对象,用户名,密码等 --- 2 --- |
| | | * 生成配置对象,用户名,密码等 --- 2 --- |
| | | */ |
| | | public MqttConnectOptions getOptions() { |
| | | MqttConnectOptions options = new MqttConnectOptions(); |
| | |
| | | } |
| | | |
| | | /** |
| | | * qos --- 3 --- |
| | | * qos --- 3 --- |
| | | */ |
| | | public int[] getQos(int length) { |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * 装载各种实例和订阅主题 --- 4 --- |
| | | * 装载各种实例和订阅主题 --- 4 --- |
| | | */ |
| | | public void create(MqttConnectOptions options, String[] topic, int[] qos) { |
| | | try { |
| | |
| | | |
| | | /** |
| | | * 发布,非持久化 |
| | | * |
| | | * qos根据文档设置为1 |
| | | * <p> |
| | | * qos根据文档设置为1 |
| | | * |
| | | * @param topic |
| | | * @param msg |
| | |
| | | * 发布 |
| | | */ |
| | | public static void publish(int qos, boolean retained, String topic, String pushMessage) { |
| | | log.info("【主题】:" + topic + "【qos】:" + qos + "【pushMessage】:" + pushMessage); |
| | | MqttMessage message = new MqttMessage(); |
| | | message.setQos(qos); |
| | | message.setRetained(retained); |
| | | try { |
| | | message.setPayload(pushMessage.getBytes("UTF-8")); |
| | | } catch (UnsupportedEncodingException e) { |
| | | log.error("mqtt编码异常:" + e.getMessage()); |
| | | } |
| | | MqttTopic mTopic = client.getTopic(topic); |
| | | if (null == mTopic) { |
| | | log.error("topic:" + topic + " 不存在"); |
| | | } |
| | | MqttDeliveryToken token; |
| | | try { |
| | | token = mTopic.publish(message); |
| | | token.waitForCompletion(); |
| | | if (token.isComplete()) { |
| | | log.info("消息发送成功"); |
| | | if (client != null) { |
| | | log.info("【主题】:" + topic + "【qos】:" + qos + "【pushMessage】:" + pushMessage); |
| | | MqttMessage message = new MqttMessage(); |
| | | message.setQos(qos); |
| | | message.setRetained(retained); |
| | | try { |
| | | message.setPayload(pushMessage.getBytes("UTF-8")); |
| | | } catch (UnsupportedEncodingException e) { |
| | | log.error("mqtt编码异常:" + e.getMessage()); |
| | | } |
| | | } catch (MqttPersistenceException e) { |
| | | e.printStackTrace(); |
| | | } catch (MqttException e) { |
| | | e.printStackTrace(); |
| | | MqttTopic mTopic = client.getTopic(topic); |
| | | if (null == mTopic) { |
| | | log.error("topic:" + topic + " 不存在"); |
| | | } |
| | | MqttDeliveryToken token; |
| | | try { |
| | | token = mTopic.publish(message); |
| | | token.waitForCompletion(); |
| | | if (token.isComplete()) { |
| | | log.info("消息发送成功"); |
| | | } |
| | | } catch (MqttPersistenceException e) { |
| | | log.error("mqtt持久异常:" + e.getMessage()); |
| | | } catch (MqttException e) { |
| | | log.error("mqtt异常:" + e.getMessage()); |
| | | } |
| | | } |
| | | } |
| | | } |