package com.dji.sdk.mqtt.status; import com.dji.sdk.cloudapi.device.UpdateTopo; import com.dji.sdk.common.Common; import com.dji.sdk.exception.CloudSDKException; import com.dji.sdk.mqtt.ChannelName; import com.dji.sdk.mqtt.MqttGatewayPublish; import com.fasterxml.jackson.core.type.TypeReference; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.util.CollectionUtils; import javax.annotation.Resource; import java.io.IOException; import java.util.Objects; import java.util.Optional; import static com.dji.sdk.mqtt.TopicConst.*; /** * * @author sean.zhou * @date 2021/11/12 * @version 0.1 */ @Configuration public class StatusRouter { @Resource private MqttGatewayPublish gatewayPublish; @Bean public IntegrationFlow statusRouterFlow() { return IntegrationFlows .from(ChannelName.INBOUND_STATUS) .transform(Message.class, source -> { try { TopicStatusRequest response = Common.getObjectMapper().readValue((byte[]) source.getPayload(), new TypeReference>() {}); String topic = String.valueOf(source.getHeaders().get(MqttHeaders.RECEIVED_TOPIC)); return response.setFrom(topic.substring((BASIC_PRE + PRODUCT).length(), topic.indexOf(STATUS_SUF))); } catch (IOException e) { throw new CloudSDKException(e); } }, null) ., Boolean>route( response -> Optional.ofNullable(response.getData()).map(UpdateTopo::getSubDevices).map(CollectionUtils::isEmpty).orElse(true), mapping -> mapping.channelMapping(true, ChannelName.INBOUND_STATUS_OFFLINE) .channelMapping(false, ChannelName.INBOUND_STATUS_ONLINE)) .get(); } @Bean public IntegrationFlow replySuccessStatus() { return IntegrationFlows .from(ChannelName.OUTBOUND_STATUS) .handle(this::publish) .nullChannel(); } private TopicStatusResponse publish(TopicStatusResponse request, MessageHeaders headers) { if (Objects.isNull(request)) { return null; } gatewayPublish.publishReply(request, headers); return request; } }