package com.ard.utils.mqtt;
|
|
import lombok.extern.slf4j.Slf4j;
|
import org.eclipse.paho.client.mqttv3.*;
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.boot.ApplicationArguments;
|
import org.springframework.boot.ApplicationRunner;
|
import org.springframework.core.annotation.Order;
|
import org.springframework.stereotype.Component;
|
|
import javax.annotation.Resource;
|
import java.io.UnsupportedEncodingException;
|
|
/**
|
* @Description: mqtt生产客户端
|
* @ClassName: MqttProducer
|
* @Author: 刘苏义
|
* @Date: 2023年05月29日9:55
|
* @Version: 1.0
|
**/
|
@Component
|
@Slf4j(topic = "mqtt")
|
@Order(1)
|
public class MqttProducer implements ApplicationRunner {
|
@Resource
|
MqttConfiguration mqttConfig;
|
|
private static MqttClient client;
|
|
@Override
|
public void run(ApplicationArguments args) {
|
log.debug("初始化并启动mqtt......");
|
if (mqttConfig.getEnabled()) {
|
this.connect();
|
}
|
}
|
|
/**
|
* 连接mqtt服务器
|
*/
|
private void connect() {
|
try {
|
// 1 创建客户端
|
getClient();
|
// 2 设置配置
|
MqttConnectOptions options = getOptions();
|
options.setMaxInflight(1000);
|
// 3 最后设置
|
create(options);
|
} catch (Exception e) {
|
log.error("mqtt连接异常:" + e);
|
}
|
}
|
|
/**
|
* 创建客户端 --- 1 ---
|
*/
|
public void getClient() {
|
try {
|
if (null == client) {
|
client = new MqttClient(mqttConfig.getHost(), mqttConfig.getClientId(), new MemoryPersistence());
|
}
|
log.debug("--创建mqtt客户端");
|
} catch (Exception e) {
|
log.error("创建mqtt客户端异常:" + e);
|
}
|
}
|
|
/**
|
* 生成配置对象,用户名,密码等 --- 2 ---
|
*/
|
public MqttConnectOptions getOptions() {
|
MqttConnectOptions options = new MqttConnectOptions();
|
//设置用户名密码
|
options.setUserName(mqttConfig.getUsername());
|
options.setPassword(mqttConfig.getPassword().toCharArray());
|
// 设置超时时间
|
options.setConnectionTimeout(mqttConfig.getTimeout());
|
// 设置会话心跳时间
|
options.setKeepAliveInterval(mqttConfig.getKeepalive());
|
// 是否清除session
|
options.setCleanSession(false);
|
log.debug("--生成mqtt配置对象");
|
return options;
|
}
|
|
|
/**
|
* 连接并装载回调 --- 3 ---
|
*/
|
public void create(MqttConnectOptions options) {
|
try {
|
client.setCallback(new MqttProducerCallback(client, options));
|
log.debug("--添加回调处理类");
|
client.connect(options);
|
} catch (Exception e) {
|
log.info("连接并装载回调异常:" + e);
|
}
|
}
|
|
|
|
/**
|
* 发布,非持久化
|
* <p>
|
* qos根据文档设置为2
|
*
|
* @param topic
|
* @param msg
|
*/
|
public static void publish(String topic, String msg) {
|
publish(2, false, topic, msg);
|
}
|
|
/**
|
* 发布
|
*/
|
public static void publish(int qos, boolean retained, String topic, String pushMessage) {
|
if (client != null) {
|
log.debug("【主题】:" + topic + "【qos】:" + qos + "【pushMessage】:" + pushMessage);
|
MqttMessage message = new MqttMessage();
|
message.setQos(qos);
|
message.setRetained(retained);
|
try {
|
message.setPayload(pushMessage.getBytes("UTF-8"));
|
} catch (UnsupportedEncodingException e) {
|
log.error("mqtt编码异常:" + e.getMessage());
|
}
|
MqttTopic mTopic = client.getTopic(topic);
|
if (null == mTopic) {
|
log.error("topic:" + topic + " 不存在");
|
}
|
MqttDeliveryToken token;
|
try {
|
token = mTopic.publish(message);
|
token.waitForCompletion();
|
if (token.isComplete()) {
|
log.debug("消息发送成功");
|
}
|
} catch (MqttPersistenceException e) {
|
log.error("mqtt持久异常:" + e.getMessage());
|
} catch (MqttException e) {
|
log.error("mqtt异常:" + e.getMessage());
|
}
|
}
|
}
|
}
|