package com.ruoyi.utils.mqtt; import com.ruoyi.common.utils.spring.SpringUtils; import com.ruoyi.sy.domain.ArdSyCarRtu; import com.ruoyi.sy.mapper.ArdSyCarRtuMapper; import com.ruoyi.sy.service.ArdSyCarRtuService; import com.ruoyi.system.service.ISysConfigService; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.Arrays; import java.util.List; /** * @author Administrator */ @Slf4j @Component public class MqttOnce{ @Autowired private ArdSyCarRtuService ardSyCarRtuService; @PostConstruct public void init() { List rtus = ardSyCarRtuService.allCarTopicList(); for (int i = 0; i < rtus.size(); i++) { ArdSyCarRtu rtu = rtus.get(i); String carId = rtu.getCarId(); String subscribe = rtu.getTopicSubscribe(); try { MqttClient client = getMqttClient(carId); MqttConnectOptions options = getMqttConnectOptions(); client.setCallback(new MqttOnceCallback(client,options,subscribe,2)); // log.debug("--添加车辆电磁锁回调处理类"); client.connect(options); }catch (Exception e) { e.printStackTrace(); } } } public void subscribeCar(ArdSyCarRtu ardSyCarRtu){ String carId = ardSyCarRtu.getCarId(); String subscribe = ardSyCarRtu.getTopicSubscribe(); try { MqttClient client = getMqttClient(carId); MqttConnectOptions options = getMqttConnectOptions(); client.setCallback(new MqttOnceCallback(client,options,subscribe,2)); // log.debug("--添加车辆电磁锁回调处理类"); client.connect(options); }catch (Exception e) { e.printStackTrace(); } } public MqttClient getMqttClient(String carId) throws MqttException { ArdSyCarRtuService carRtuService = SpringUtils.getBean(ArdSyCarRtuService.class); //获取电磁锁地址 String mqttUrl = carRtuService.url("ElectromagneticLockUrl"); MqttClient client = new MqttClient(mqttUrl,carId); //log.debug("--创建"+carId+"号车辆电子锁mqtt客户端"); return client; } public MqttConnectOptions getMqttConnectOptions(){ MqttConnectOptions options = new MqttConnectOptions(); //设置用户名密码 options.setUserName("admin"); options.setPassword("xzx12345".toCharArray()); // 设置超时时间 options.setConnectionTimeout(100); // 设置会话心跳时间 options.setKeepAliveInterval(60); // 是否清除session options.setCleanSession(true); // log.debug("--生成"+carId+"号车辆电子锁mqtt配置对象"); return options; } public String publishCar(String carId,String topic,String message,String type) throws MqttException { MqttClient client = getMqttClient(carId); MqttConnectOptions options = getMqttConnectOptions(); MqttMessage msg = new MqttMessage(message.getBytes()); msg.setQos(2); client.publish(topic,msg); client.disconnect(); return "发布"+type+"指令成功!"; } }