package com.dji.sample.wayline.service.impl; import com.dji.sample.common.error.CommonErrorEnum; import com.dji.sample.common.model.CustomClaim; import com.dji.sample.component.mqtt.model.EventsReceiver; import com.dji.sample.component.redis.RedisConst; import com.dji.sample.component.redis.RedisOpsUtils; import com.dji.sample.component.websocket.service.IWebSocketMessageService; import com.dji.sample.component.websocketWmm.WebSocketServerPlayBack; import com.dji.sample.manage.model.dto.DeviceDTO; import com.dji.sample.manage.service.IDeviceRedisService; import com.dji.sample.media.model.MediaFileCountDTO; import com.dji.sample.media.service.IMediaRedisService; import com.dji.sample.wayline.model.dto.ConditionalWaylineJobKey; import com.dji.sample.wayline.model.dto.WaylineJobDTO; import com.dji.sample.wayline.model.dto.WaylineTaskConditionDTO; import com.dji.sample.wayline.model.enums.WaylineErrorCodeEnum; import com.dji.sample.wayline.model.enums.WaylineJobStatusEnum; import com.dji.sample.wayline.model.param.CreateJobParam; import com.dji.sample.wayline.model.param.UpdateJobParam; import com.dji.sample.wayline.service.IFlightTaskService; import com.dji.sample.wayline.service.IWaylineFileService; import com.dji.sample.wayline.service.IWaylineJobService; import com.dji.sample.wayline.service.IWaylineRedisService; import com.dji.sdk.cloudapi.device.ExitWaylineWhenRcLostEnum; import com.dji.sdk.cloudapi.media.UploadFlighttaskMediaPrioritize; import com.dji.sdk.cloudapi.media.api.AbstractMediaService; import com.dji.sdk.cloudapi.wayline.*; import com.dji.sdk.cloudapi.wayline.api.AbstractWaylineService; import com.dji.sdk.common.HttpResultResponse; import com.dji.sdk.common.SDKManager; import com.dji.sdk.mqtt.MqttReply; import com.dji.sdk.mqtt.events.TopicEventsRequest; import com.dji.sdk.mqtt.events.TopicEventsResponse; import com.dji.sdk.mqtt.requests.TopicRequestsRequest; import com.dji.sdk.mqtt.requests.TopicRequestsResponse; import com.dji.sdk.mqtt.services.ServicesReplyData; import com.dji.sdk.mqtt.services.TopicServicesResponse; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpStatus; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.messaging.MessageHeaders; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import java.net.URL; import java.sql.SQLException; import java.time.*; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** * @author sean * @version 1.1 * @date 2022/6/9 */ @Service @Slf4j public class FlightTaskServiceImpl extends AbstractWaylineService implements IFlightTaskService { @Autowired private ObjectMapper mapper; @Autowired private IWebSocketMessageService websocketMessageService; @Autowired private IWaylineJobService waylineJobService; @Autowired private IDeviceRedisService deviceRedisService; @Autowired private IWaylineRedisService waylineRedisService; @Autowired private IMediaRedisService mediaRedisService; @Autowired private IWaylineFileService waylineFileService; @Autowired private SDKWaylineService abstractWaylineService; @Autowired @Qualifier("mediaServiceImpl") private AbstractMediaService abstractMediaService; @Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS) public void checkScheduledJob() { Object jobIdValue = RedisOpsUtils.zGetMin(RedisConst.WAYLINE_JOB_TIMED_EXECUTE); if (Objects.isNull(jobIdValue)) { return; } log.info("Check the timed tasks of the wayline. {}", jobIdValue); // format: {workspace_id}:{dock_sn}:{job_id} String[] jobArr = String.valueOf(jobIdValue).split(RedisConst.DELIMITER); double time = RedisOpsUtils.zScore(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue); long now = System.currentTimeMillis(); int offset = 30_000; // Expired tasks are deleted directly. if (time < now - offset) { RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue); waylineJobService.updateJob(WaylineJobDTO.builder() .jobId(jobArr[2]) .status(WaylineJobStatusEnum.FAILED.getVal()) .executeTime(LocalDateTime.now()) .completedTime(LocalDateTime.now()) .code(HttpStatus.SC_REQUEST_TIMEOUT).build()); return; } if (now <= time && time <= now + offset) { try { this.executeFlightTask(jobArr[0], jobArr[2]); } catch (Exception e) { log.info("The scheduled task delivery failed."); waylineJobService.updateJob(WaylineJobDTO.builder() .jobId(jobArr[2]) .status(WaylineJobStatusEnum.FAILED.getVal()) .executeTime(LocalDateTime.now()) .completedTime(LocalDateTime.now()) .code(HttpStatus.SC_INTERNAL_SERVER_ERROR).build()); } finally { RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue); } } } @Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS) public void prepareConditionJob() { Optional jobKeyOpt = waylineRedisService.getNearestConditionalWaylineJob(); if (jobKeyOpt.isEmpty()) { return; } ConditionalWaylineJobKey jobKey = jobKeyOpt.get(); log.info("Check the conditional tasks of the wayline. {}", jobKey.toString()); // format: {workspace_id}:{dock_sn}:{job_id} double time = waylineRedisService.getConditionalWaylineJobTime(jobKey); long now = System.currentTimeMillis(); // prepare the task one day in advance. int offset = 86_400_000; if (now + offset < time) { return; } WaylineJobDTO job = WaylineJobDTO.builder() .jobId(jobKey.getJobId()) .status(WaylineJobStatusEnum.FAILED.getVal()) .executeTime(LocalDateTime.now()) .completedTime(LocalDateTime.now()) .code(HttpStatus.SC_INTERNAL_SERVER_ERROR).build(); try { Optional waylineJobOpt = waylineRedisService.getConditionalWaylineJob(jobKey.getJobId()); if (waylineJobOpt.isEmpty()) { job.setCode(CommonErrorEnum.REDIS_DATA_NOT_FOUND.getCode()); waylineJobService.updateJob(job); waylineRedisService.removePrepareConditionalWaylineJob(jobKey); return; } WaylineJobDTO waylineJob = waylineJobOpt.get(); HttpResultResponse result = this.publishOneFlightTask(waylineJob); waylineRedisService.removePrepareConditionalWaylineJob(jobKey); if (HttpResultResponse.CODE_SUCCESS == result.getCode()) { return; } // If the end time is exceeded, no more retries will be made. waylineRedisService.delConditionalWaylineJob(jobKey.getJobId()); if (waylineJob.getEndTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - RedisConst.WAYLINE_JOB_BLOCK_TIME * 1000 < now) { return; } // Retry if the end time has not been exceeded. this.retryPrepareJob(jobKey, waylineJob); } catch (Exception e) { log.info("Failed to prepare the conditional task."); waylineJobService.updateJob(job); } } /** * For immediate tasks, the server time shall prevail. * @param param */ private void fillImmediateTime(CreateJobParam param) { if (TaskTypeEnum.IMMEDIATE != param.getTaskType()) { return; } long now = System.currentTimeMillis() / 1000; param.setTaskDays(List.of(now)); param.setTaskPeriods(List.of(List.of(now))); } private void addConditions(WaylineJobDTO waylineJob, CreateJobParam param, Long beginTime, Long endTime) { if (TaskTypeEnum.CONDITIONAL != param.getTaskType()) { return; } waylineJob.setConditions( WaylineTaskConditionDTO.builder() .executableConditions(Objects.nonNull(param.getMinStorageCapacity()) ? new ExecutableConditions().setStorageCapacity(param.getMinStorageCapacity()) : null) .readyConditions(new ReadyConditions() .setBatteryCapacity(param.getMinBatteryCapacity()) .setBeginTime(beginTime) .setEndTime(endTime)) .build()); waylineRedisService.setConditionalWaylineJob(waylineJob); // key: wayline_job_condition, value: {workspace_id}:{dock_sn}:{job_id} boolean isAdd = waylineRedisService.addPrepareConditionalWaylineJob(waylineJob); if (!isAdd) { throw new RuntimeException("Failed to create conditional job."); } } @Override public HttpResultResponse publishFlightTask(CreateJobParam param, CustomClaim customClaim) throws SQLException { fillImmediateTime(param); for (Long taskDay : param.getTaskDays()) { LocalDate date = LocalDate.ofInstant(Instant.ofEpochSecond(taskDay), ZoneId.systemDefault()); for (List taskPeriod : param.getTaskPeriods()) { long beginTime = LocalDateTime.of(date, LocalTime.ofInstant(Instant.ofEpochSecond(taskPeriod.get(0)), ZoneId.systemDefault())) .atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); long endTime = taskPeriod.size() > 1 ? LocalDateTime.of(date, LocalTime.ofInstant(Instant.ofEpochSecond(taskPeriod.get(1)), ZoneId.systemDefault())) .atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() : beginTime; if (TaskTypeEnum.IMMEDIATE != param.getTaskType() && endTime < System.currentTimeMillis()) { continue; } Optional waylineJobOpt = waylineJobService.createWaylineJob(param, customClaim.getWorkspaceId(), customClaim.getUsername(), beginTime, endTime); if (waylineJobOpt.isEmpty()) { throw new SQLException("Failed to create wayline job."); } WaylineJobDTO waylineJob = waylineJobOpt.get(); // If it is a conditional task type, add conditions to the job parameters. addConditions(waylineJob, param, beginTime, endTime); HttpResultResponse response = this.publishOneFlightTask(waylineJob); if (HttpResultResponse.CODE_SUCCESS != response.getCode()) { return response; } } } return HttpResultResponse.success(); } public HttpResultResponse publishOneFlightTask(WaylineJobDTO waylineJob) throws SQLException { boolean isOnline = deviceRedisService.checkDeviceOnline(waylineJob.getDockSn()); if (!isOnline) { throw new RuntimeException("Dock is offline."); } boolean isSuccess = this.prepareFlightTask(waylineJob); if (!isSuccess) { return HttpResultResponse.error("Failed to prepare job."); } // Issue an immediate task execution command. if (TaskTypeEnum.IMMEDIATE == waylineJob.getTaskType()) { if (!executeFlightTask(waylineJob.getWorkspaceId(), waylineJob.getJobId())) { return HttpResultResponse.error("Failed to execute job."); } } if (TaskTypeEnum.TIMED == waylineJob.getTaskType()) { // key: wayline_job_timed, value: {workspace_id}:{dock_sn}:{job_id} boolean isAdd = RedisOpsUtils.zAdd(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, waylineJob.getWorkspaceId() + RedisConst.DELIMITER + waylineJob.getDockSn() + RedisConst.DELIMITER + waylineJob.getJobId(), waylineJob.getBeginTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); if (!isAdd) { return HttpResultResponse.error("Failed to create scheduled job."); } } return HttpResultResponse.success(); } private Boolean prepareFlightTask(WaylineJobDTO waylineJob) throws SQLException { // get wayline file Optional waylineFile = waylineFileService.getWaylineByWaylineId(waylineJob.getWorkspaceId(), waylineJob.getFileId()); if (waylineFile.isEmpty()) { throw new SQLException("Wayline file doesn't exist."); } // get file url URL url = waylineFileService.getObjectUrl(waylineJob.getWorkspaceId(), waylineFile.get().getId()); FlighttaskPrepareRequest flightTask = new FlighttaskPrepareRequest() .setFlightId(waylineJob.getJobId()) .setExecuteTime(waylineJob.getBeginTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) .setTaskType(waylineJob.getTaskType()) .setWaylineType(waylineJob.getWaylineType()) .setRthAltitude(waylineJob.getRthAltitude()) .setOutOfControlAction(waylineJob.getOutOfControlAction()) .setExitWaylineWhenRcLost(ExitWaylineWhenRcLostEnum.EXECUTE_RC_LOST_ACTION) .setFile(new FlighttaskFile() .setUrl(url.toString()) .setFingerprint(waylineFile.get().getSign())); if (TaskTypeEnum.CONDITIONAL == waylineJob.getTaskType()) { if (Objects.isNull(waylineJob.getConditions())) { throw new IllegalArgumentException(); } flightTask.setReadyConditions(waylineJob.getConditions().getReadyConditions()); flightTask.setExecutableConditions(waylineJob.getConditions().getExecutableConditions()); } TopicServicesResponse serviceReply = abstractWaylineService.flighttaskPrepare( SDKManager.getDeviceSDK(waylineJob.getDockSn()), flightTask,""); if (!serviceReply.getData().getResult().isSuccess()) { log.info("Prepare task ====> Error code: {}", serviceReply.getData().getResult()); waylineJobService.updateJob(WaylineJobDTO.builder() .workspaceId(waylineJob.getWorkspaceId()) .jobId(waylineJob.getJobId()) .executeTime(LocalDateTime.now()) .status(WaylineJobStatusEnum.FAILED.getVal()) .completedTime(LocalDateTime.now()) .code(serviceReply.getData().getResult().getCode()).build()); return false; } return true; } @Override public Boolean executeFlightTask(String workspaceId, String jobId) { // get job Optional waylineJob = waylineJobService.getJobByJobId(workspaceId, jobId); if (waylineJob.isEmpty()) { throw new IllegalArgumentException("Job doesn't exist."); } boolean isOnline = deviceRedisService.checkDeviceOnline(waylineJob.get().getDockSn()); if (!isOnline) { throw new RuntimeException("Dock is offline."); } WaylineJobDTO job = waylineJob.get(); TopicServicesResponse serviceReply = abstractWaylineService.flighttaskExecute( SDKManager.getDeviceSDK(job.getDockSn()), new FlighttaskExecuteRequest().setFlightId(jobId),""); if (!serviceReply.getData().getResult().isSuccess()) { log.info("Execute job ====> Error: {}", serviceReply.getData().getResult()); waylineJobService.updateJob(WaylineJobDTO.builder() .jobId(jobId) .executeTime(LocalDateTime.now()) .status(WaylineJobStatusEnum.FAILED.getVal()) .completedTime(LocalDateTime.now()) .code(serviceReply.getData().getResult().getCode()).build()); // The conditional task fails and enters the blocking status. if (TaskTypeEnum.CONDITIONAL == job.getTaskType() && WaylineErrorCodeEnum.find(serviceReply.getData().getResult().getCode()).isBlock()) { waylineRedisService.setBlockedWaylineJob(job.getDockSn(), jobId); } return false; } waylineJobService.updateJob(WaylineJobDTO.builder() .jobId(jobId) .executeTime(LocalDateTime.now()) .status(WaylineJobStatusEnum.IN_PROGRESS.getVal()) .build()); waylineRedisService.setRunningWaylineJob(job.getDockSn(), EventsReceiver.builder().bid(jobId).sn(job.getDockSn()).build()); return true; } @Override public void cancelFlightTask(String workspaceId, Collection jobIds) { List waylineJobs = waylineJobService.getJobsByConditions(workspaceId, jobIds, WaylineJobStatusEnum.PENDING); Set waylineJobIds = waylineJobs.stream().map(WaylineJobDTO::getJobId).collect(Collectors.toSet()); // Check if the task status is correct. boolean isErr = !jobIds.removeAll(waylineJobIds) || !jobIds.isEmpty() ; if (isErr) { throw new IllegalArgumentException("These tasks have an incorrect status and cannot be canceled. " + Arrays.toString(jobIds.toArray())); } // Group job id by dock sn. Map> dockJobs = waylineJobs.stream() .collect(Collectors.groupingBy(WaylineJobDTO::getDockSn, Collectors.mapping(WaylineJobDTO::getJobId, Collectors.toList()))); dockJobs.forEach((dockSn, idList) -> this.publishCancelTask(workspaceId, dockSn, idList)); } public void publishCancelTask(String workspaceId, String dockSn, List jobIds) { boolean isOnline = deviceRedisService.checkDeviceOnline(dockSn); if (!isOnline) { throw new RuntimeException("Dock is offline."); } TopicServicesResponse serviceReply = abstractWaylineService.flighttaskUndo(SDKManager.getDeviceSDK(dockSn), new FlighttaskUndoRequest().setFlightIds(jobIds)); if (!serviceReply.getData().getResult().isSuccess()) { log.info("Cancel job ====> Error: {}", serviceReply.getData().getResult()); throw new RuntimeException("Failed to cancel the wayline job of " + dockSn); } for (String jobId : jobIds) { waylineJobService.updateJob(WaylineJobDTO.builder() .workspaceId(workspaceId) .jobId(jobId) .status(WaylineJobStatusEnum.CANCEL.getVal()) .completedTime(LocalDateTime.now()) .build()); RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, workspaceId + RedisConst.DELIMITER + dockSn + RedisConst.DELIMITER + jobId); } } @Override public void uploadMediaHighestPriority(String workspaceId, String jobId) { Optional jobOpt = waylineJobService.getJobByJobId(workspaceId, jobId); if (jobOpt.isEmpty()) { throw new RuntimeException(CommonErrorEnum.ILLEGAL_ARGUMENT.getMessage()); } String dockSn = jobOpt.get().getDockSn(); String key = RedisConst.MEDIA_HIGHEST_PRIORITY_PREFIX + dockSn; if (RedisOpsUtils.checkExist(key) && jobId.equals(((MediaFileCountDTO) RedisOpsUtils.get(key)).getJobId())) { return; } TopicServicesResponse reply = abstractMediaService.uploadFlighttaskMediaPrioritize( SDKManager.getDeviceSDK(dockSn), new UploadFlighttaskMediaPrioritize().setFlightId(jobId)); if (!reply.getData().getResult().isSuccess()) { throw new RuntimeException("Failed to set media job upload priority. Error: " + reply.getData().getResult()); } } @Override public void updateJobStatus(String workspaceId, String jobId, UpdateJobParam param) { Optional waylineJobOpt = waylineJobService.getJobByJobId(workspaceId, jobId); if (waylineJobOpt.isEmpty()) { throw new RuntimeException("The job does not exist."); } WaylineJobDTO waylineJob = waylineJobOpt.get(); WaylineJobStatusEnum statusEnum = waylineJobService.getWaylineState(waylineJob.getDockSn()); if (statusEnum.getEnd() || WaylineJobStatusEnum.PENDING == statusEnum) { throw new RuntimeException("The wayline job status does not match, and the operation cannot be performed."); } switch (param.getStatus()) { case PAUSE: pauseJob(workspaceId, waylineJob.getDockSn(), jobId, statusEnum); break; case RESUME: resumeJob(workspaceId, waylineJob.getDockSn(), jobId, statusEnum); break; } } private void pauseJob(String workspaceId, String dockSn, String jobId, WaylineJobStatusEnum statusEnum) { if (WaylineJobStatusEnum.PAUSED == statusEnum && jobId.equals(waylineRedisService.getPausedWaylineJobId(dockSn))) { waylineRedisService.setPausedWaylineJob(dockSn, jobId); return; } TopicServicesResponse reply = abstractWaylineService.flighttaskPause(SDKManager.getDeviceSDK(dockSn)); if (!reply.getData().getResult().isSuccess()) { throw new RuntimeException("Failed to pause wayline job. Error: " + reply.getData().getResult()); } waylineRedisService.delRunningWaylineJob(dockSn); waylineRedisService.setPausedWaylineJob(dockSn, jobId); } private void resumeJob(String workspaceId, String dockSn, String jobId, WaylineJobStatusEnum statusEnum) { Optional> runningDataOpt = waylineRedisService.getRunningWaylineJob(dockSn); if (WaylineJobStatusEnum.IN_PROGRESS == statusEnum && jobId.equals(runningDataOpt.map(EventsReceiver::getSn).get())) { waylineRedisService.setRunningWaylineJob(dockSn, runningDataOpt.get()); return; } TopicServicesResponse reply = abstractWaylineService.flighttaskRecovery(SDKManager.getDeviceSDK(dockSn)); if (!reply.getData().getResult().isSuccess()) { throw new RuntimeException("Failed to resume wayline job. Error: " + reply.getData().getResult()); } runningDataOpt.ifPresent(runningData -> waylineRedisService.setRunningWaylineJob(dockSn, runningData)); waylineRedisService.delPausedWaylineJob(dockSn); } @Override public void retryPrepareJob(ConditionalWaylineJobKey jobKey, WaylineJobDTO waylineJob) { Optional childJobOpt = waylineJobService.createWaylineJobByParent(jobKey.getWorkspaceId(), jobKey.getJobId()); if (childJobOpt.isEmpty()) { log.error("Failed to create wayline job."); return; } WaylineJobDTO newJob = childJobOpt.get(); newJob.setBeginTime(LocalDateTime.now().plusSeconds(RedisConst.WAYLINE_JOB_BLOCK_TIME)); boolean isAdd = waylineRedisService.addPrepareConditionalWaylineJob(newJob); if (!isAdd) { log.error("Failed to create wayline job. {}", newJob.getJobId()); return; } waylineJob.setJobId(newJob.getJobId()); waylineRedisService.setConditionalWaylineJob(waylineJob); } @Override public TopicEventsResponse flighttaskReady(TopicEventsRequest request, MessageHeaders headers) { WebSocketServerPlayBack.sendInfo(request.toString()); // List flightIds = response.getData().getFlightIds(); // // log.info("ready task list:{}", Arrays.toString(flightIds.toArray()) ); // // Check conditional task blocking status. // String blockedId = waylineRedisService.getBlockedWaylineJobId(response.getGateway()); // if (!StringUtils.hasText(blockedId)) { // return null; // } // // Optional deviceOpt = deviceRedisService.getDeviceOnline(response.getGateway()); // if (deviceOpt.isEmpty()) { // return null; // } // DeviceDTO device = deviceOpt.get(); // // try { // for (String jobId : flightIds) { // boolean isExecute = this.executeFlightTask(device.getWorkspaceId(), jobId); // if (!isExecute) { // return null; // } // Optional waylineJobOpt = waylineRedisService.getConditionalWaylineJob(jobId); // if (waylineJobOpt.isEmpty()) { // log.info("The conditional job has expired and will no longer be executed."); // return new TopicEventsResponse<>(); // } // WaylineJobDTO waylineJob = waylineJobOpt.get(); // this.retryPrepareJob(new ConditionalWaylineJobKey(device.getWorkspaceId(), response.getGateway(), jobId), waylineJob); // return new TopicEventsResponse<>(); // } // } catch (Exception e) { // log.error("Failed to execute conditional task."); // e.printStackTrace(); // } return new TopicEventsResponse<>(); } @Override public TopicEventsResponse returnHomeInfo(TopicRequestsRequest request, MessageHeaders headers) { WebSocketServerPlayBack.sendInfo(request.toString()); return new TopicEventsResponse<>(); } }