package com.dji.sample.wayline.service.impl;
|
|
import com.dji.sample.common.error.CommonErrorEnum;
|
import com.dji.sample.common.model.CustomClaim;
|
import com.dji.sample.component.mqtt.model.EventsReceiver;
|
import com.dji.sample.component.redis.RedisConst;
|
import com.dji.sample.component.redis.RedisOpsUtils;
|
import com.dji.sample.component.websocket.service.IWebSocketMessageService;
|
import com.dji.sample.component.websocketWmm.WebSocketServerPlayBack;
|
import com.dji.sample.manage.model.dto.DeviceDTO;
|
import com.dji.sample.manage.service.IDeviceRedisService;
|
import com.dji.sample.media.model.MediaFileCountDTO;
|
import com.dji.sample.media.service.IMediaRedisService;
|
import com.dji.sample.wayline.model.dto.ConditionalWaylineJobKey;
|
import com.dji.sample.wayline.model.dto.WaylineJobDTO;
|
import com.dji.sample.wayline.model.dto.WaylineTaskConditionDTO;
|
import com.dji.sample.wayline.model.enums.WaylineErrorCodeEnum;
|
import com.dji.sample.wayline.model.enums.WaylineJobStatusEnum;
|
import com.dji.sample.wayline.model.param.CreateJobParam;
|
import com.dji.sample.wayline.model.param.UpdateJobParam;
|
import com.dji.sample.wayline.service.IFlightTaskService;
|
import com.dji.sample.wayline.service.IWaylineFileService;
|
import com.dji.sample.wayline.service.IWaylineJobService;
|
import com.dji.sample.wayline.service.IWaylineRedisService;
|
import com.dji.sdk.cloudapi.device.ExitWaylineWhenRcLostEnum;
|
import com.dji.sdk.cloudapi.media.UploadFlighttaskMediaPrioritize;
|
import com.dji.sdk.cloudapi.media.api.AbstractMediaService;
|
import com.dji.sdk.cloudapi.wayline.*;
|
import com.dji.sdk.cloudapi.wayline.api.AbstractWaylineService;
|
import com.dji.sdk.common.HttpResultResponse;
|
import com.dji.sdk.common.SDKManager;
|
import com.dji.sdk.mqtt.MqttReply;
|
import com.dji.sdk.mqtt.events.TopicEventsRequest;
|
import com.dji.sdk.mqtt.events.TopicEventsResponse;
|
import com.dji.sdk.mqtt.requests.TopicRequestsRequest;
|
import com.dji.sdk.mqtt.requests.TopicRequestsResponse;
|
import com.dji.sdk.mqtt.services.ServicesReplyData;
|
import com.dji.sdk.mqtt.services.TopicServicesResponse;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.http.HttpStatus;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Qualifier;
|
import org.springframework.messaging.MessageHeaders;
|
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.stereotype.Service;
|
import org.springframework.util.StringUtils;
|
|
import java.net.URL;
|
import java.sql.SQLException;
|
import java.time.*;
|
import java.util.*;
|
import java.util.concurrent.TimeUnit;
|
import java.util.stream.Collectors;
|
|
/**
|
* @author sean
|
* @version 1.1
|
* @date 2022/6/9
|
*/
|
@Service
|
@Slf4j
|
public class FlightTaskServiceImpl extends AbstractWaylineService implements IFlightTaskService {
|
|
@Autowired
|
private ObjectMapper mapper;
|
|
@Autowired
|
private IWebSocketMessageService websocketMessageService;
|
|
@Autowired
|
private IWaylineJobService waylineJobService;
|
|
@Autowired
|
private IDeviceRedisService deviceRedisService;
|
|
@Autowired
|
private IWaylineRedisService waylineRedisService;
|
|
@Autowired
|
private IMediaRedisService mediaRedisService;
|
|
@Autowired
|
private IWaylineFileService waylineFileService;
|
|
@Autowired
|
private SDKWaylineService abstractWaylineService;
|
|
@Autowired
|
@Qualifier("mediaServiceImpl")
|
private AbstractMediaService abstractMediaService;
|
|
@Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS)
|
public void checkScheduledJob() {
|
Object jobIdValue = RedisOpsUtils.zGetMin(RedisConst.WAYLINE_JOB_TIMED_EXECUTE);
|
if (Objects.isNull(jobIdValue)) {
|
return;
|
}
|
log.info("Check the timed tasks of the wayline. {}", jobIdValue);
|
// format: {workspace_id}:{dock_sn}:{job_id}
|
String[] jobArr = String.valueOf(jobIdValue).split(RedisConst.DELIMITER);
|
double time = RedisOpsUtils.zScore(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue);
|
long now = System.currentTimeMillis();
|
int offset = 30_000;
|
|
// Expired tasks are deleted directly.
|
if (time < now - offset) {
|
RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue);
|
waylineJobService.updateJob(WaylineJobDTO.builder()
|
.jobId(jobArr[2])
|
.status(WaylineJobStatusEnum.FAILED.getVal())
|
.executeTime(LocalDateTime.now())
|
.completedTime(LocalDateTime.now())
|
.code(HttpStatus.SC_REQUEST_TIMEOUT).build());
|
return;
|
}
|
|
if (now <= time && time <= now + offset) {
|
try {
|
this.executeFlightTask(jobArr[0], jobArr[2]);
|
} catch (Exception e) {
|
log.info("The scheduled task delivery failed.");
|
waylineJobService.updateJob(WaylineJobDTO.builder()
|
.jobId(jobArr[2])
|
.status(WaylineJobStatusEnum.FAILED.getVal())
|
.executeTime(LocalDateTime.now())
|
.completedTime(LocalDateTime.now())
|
.code(HttpStatus.SC_INTERNAL_SERVER_ERROR).build());
|
} finally {
|
RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue);
|
}
|
}
|
}
|
|
@Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS)
|
public void prepareConditionJob() {
|
Optional<ConditionalWaylineJobKey> jobKeyOpt = waylineRedisService.getNearestConditionalWaylineJob();
|
if (jobKeyOpt.isEmpty()) {
|
return;
|
}
|
ConditionalWaylineJobKey jobKey = jobKeyOpt.get();
|
log.info("Check the conditional tasks of the wayline. {}", jobKey.toString());
|
// format: {workspace_id}:{dock_sn}:{job_id}
|
double time = waylineRedisService.getConditionalWaylineJobTime(jobKey);
|
long now = System.currentTimeMillis();
|
// prepare the task one day in advance.
|
int offset = 86_400_000;
|
|
if (now + offset < time) {
|
return;
|
}
|
|
WaylineJobDTO job = WaylineJobDTO.builder()
|
.jobId(jobKey.getJobId())
|
.status(WaylineJobStatusEnum.FAILED.getVal())
|
.executeTime(LocalDateTime.now())
|
.completedTime(LocalDateTime.now())
|
.code(HttpStatus.SC_INTERNAL_SERVER_ERROR).build();
|
try {
|
Optional<WaylineJobDTO> waylineJobOpt = waylineRedisService.getConditionalWaylineJob(jobKey.getJobId());
|
if (waylineJobOpt.isEmpty()) {
|
job.setCode(CommonErrorEnum.REDIS_DATA_NOT_FOUND.getCode());
|
waylineJobService.updateJob(job);
|
waylineRedisService.removePrepareConditionalWaylineJob(jobKey);
|
return;
|
}
|
WaylineJobDTO waylineJob = waylineJobOpt.get();
|
|
HttpResultResponse result = this.publishOneFlightTask(waylineJob);
|
waylineRedisService.removePrepareConditionalWaylineJob(jobKey);
|
|
if (HttpResultResponse.CODE_SUCCESS == result.getCode()) {
|
return;
|
}
|
|
// If the end time is exceeded, no more retries will be made.
|
waylineRedisService.delConditionalWaylineJob(jobKey.getJobId());
|
if (waylineJob.getEndTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - RedisConst.WAYLINE_JOB_BLOCK_TIME * 1000 < now) {
|
return;
|
}
|
|
// Retry if the end time has not been exceeded.
|
this.retryPrepareJob(jobKey, waylineJob);
|
|
} catch (Exception e) {
|
log.info("Failed to prepare the conditional task.");
|
waylineJobService.updateJob(job);
|
}
|
|
}
|
|
/**
|
* For immediate tasks, the server time shall prevail.
|
* @param param
|
*/
|
private void fillImmediateTime(CreateJobParam param) {
|
if (TaskTypeEnum.IMMEDIATE != param.getTaskType()) {
|
return;
|
}
|
long now = System.currentTimeMillis() / 1000;
|
param.setTaskDays(List.of(now));
|
param.setTaskPeriods(List.of(List.of(now)));
|
}
|
|
|
private void addConditions(WaylineJobDTO waylineJob, CreateJobParam param, Long beginTime, Long endTime) {
|
if (TaskTypeEnum.CONDITIONAL != param.getTaskType()) {
|
return;
|
}
|
|
waylineJob.setConditions(
|
WaylineTaskConditionDTO.builder()
|
.executableConditions(Objects.nonNull(param.getMinStorageCapacity()) ?
|
new ExecutableConditions().setStorageCapacity(param.getMinStorageCapacity()) : null)
|
.readyConditions(new ReadyConditions()
|
.setBatteryCapacity(param.getMinBatteryCapacity())
|
.setBeginTime(beginTime)
|
.setEndTime(endTime))
|
.build());
|
|
waylineRedisService.setConditionalWaylineJob(waylineJob);
|
// key: wayline_job_condition, value: {workspace_id}:{dock_sn}:{job_id}
|
boolean isAdd = waylineRedisService.addPrepareConditionalWaylineJob(waylineJob);
|
if (!isAdd) {
|
throw new RuntimeException("Failed to create conditional job.");
|
}
|
}
|
|
@Override
|
public HttpResultResponse publishFlightTask(CreateJobParam param, CustomClaim customClaim) throws SQLException {
|
fillImmediateTime(param);
|
|
for (Long taskDay : param.getTaskDays()) {
|
LocalDate date = LocalDate.ofInstant(Instant.ofEpochSecond(taskDay), ZoneId.systemDefault());
|
for (List<Long> taskPeriod : param.getTaskPeriods()) {
|
long beginTime = LocalDateTime.of(date, LocalTime.ofInstant(Instant.ofEpochSecond(taskPeriod.get(0)), ZoneId.systemDefault()))
|
.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
|
long endTime = taskPeriod.size() > 1 ?
|
LocalDateTime.of(date, LocalTime.ofInstant(Instant.ofEpochSecond(taskPeriod.get(1)), ZoneId.systemDefault()))
|
.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() : beginTime;
|
if (TaskTypeEnum.IMMEDIATE != param.getTaskType() && endTime < System.currentTimeMillis()) {
|
continue;
|
}
|
|
Optional<WaylineJobDTO> waylineJobOpt = waylineJobService.createWaylineJob(param, customClaim.getWorkspaceId(), customClaim.getUsername(), beginTime, endTime);
|
if (waylineJobOpt.isEmpty()) {
|
throw new SQLException("Failed to create wayline job.");
|
}
|
WaylineJobDTO waylineJob = waylineJobOpt.get();
|
// If it is a conditional task type, add conditions to the job parameters.
|
addConditions(waylineJob, param, beginTime, endTime);
|
|
HttpResultResponse response = this.publishOneFlightTask(waylineJob);
|
if (HttpResultResponse.CODE_SUCCESS != response.getCode()) {
|
return response;
|
}
|
}
|
}
|
return HttpResultResponse.success();
|
}
|
|
public HttpResultResponse publishOneFlightTask(WaylineJobDTO waylineJob) throws SQLException {
|
|
boolean isOnline = deviceRedisService.checkDeviceOnline(waylineJob.getDockSn());
|
if (!isOnline) {
|
throw new RuntimeException("Dock is offline.");
|
}
|
|
boolean isSuccess = this.prepareFlightTask(waylineJob);
|
if (!isSuccess) {
|
return HttpResultResponse.error("Failed to prepare job.");
|
}
|
|
// Issue an immediate task execution command.
|
if (TaskTypeEnum.IMMEDIATE == waylineJob.getTaskType()) {
|
if (!executeFlightTask(waylineJob.getWorkspaceId(), waylineJob.getJobId())) {
|
return HttpResultResponse.error("Failed to execute job.");
|
}
|
}
|
|
if (TaskTypeEnum.TIMED == waylineJob.getTaskType()) {
|
// key: wayline_job_timed, value: {workspace_id}:{dock_sn}:{job_id}
|
boolean isAdd = RedisOpsUtils.zAdd(RedisConst.WAYLINE_JOB_TIMED_EXECUTE,
|
waylineJob.getWorkspaceId() + RedisConst.DELIMITER + waylineJob.getDockSn() + RedisConst.DELIMITER + waylineJob.getJobId(),
|
waylineJob.getBeginTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
|
if (!isAdd) {
|
return HttpResultResponse.error("Failed to create scheduled job.");
|
}
|
}
|
|
return HttpResultResponse.success();
|
}
|
|
private Boolean prepareFlightTask(WaylineJobDTO waylineJob) throws SQLException {
|
// get wayline file
|
Optional<GetWaylineListResponse> waylineFile = waylineFileService.getWaylineByWaylineId(waylineJob.getWorkspaceId(), waylineJob.getFileId());
|
if (waylineFile.isEmpty()) {
|
throw new SQLException("Wayline file doesn't exist.");
|
}
|
|
// get file url
|
URL url = waylineFileService.getObjectUrl(waylineJob.getWorkspaceId(), waylineFile.get().getId());
|
|
FlighttaskPrepareRequest flightTask = new FlighttaskPrepareRequest()
|
.setFlightId(waylineJob.getJobId())
|
.setExecuteTime(waylineJob.getBeginTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli())
|
.setTaskType(waylineJob.getTaskType())
|
.setWaylineType(waylineJob.getWaylineType())
|
.setRthAltitude(waylineJob.getRthAltitude())
|
.setOutOfControlAction(waylineJob.getOutOfControlAction())
|
.setExitWaylineWhenRcLost(ExitWaylineWhenRcLostEnum.EXECUTE_RC_LOST_ACTION)
|
.setFile(new FlighttaskFile()
|
.setUrl(url.toString())
|
.setFingerprint(waylineFile.get().getSign()));
|
|
if (TaskTypeEnum.CONDITIONAL == waylineJob.getTaskType()) {
|
if (Objects.isNull(waylineJob.getConditions())) {
|
throw new IllegalArgumentException();
|
}
|
flightTask.setReadyConditions(waylineJob.getConditions().getReadyConditions());
|
flightTask.setExecutableConditions(waylineJob.getConditions().getExecutableConditions());
|
}
|
|
TopicServicesResponse<ServicesReplyData> serviceReply = abstractWaylineService.flighttaskPrepare(
|
SDKManager.getDeviceSDK(waylineJob.getDockSn()), flightTask,"");
|
if (!serviceReply.getData().getResult().isSuccess()) {
|
log.info("Prepare task ====> Error code: {}", serviceReply.getData().getResult());
|
waylineJobService.updateJob(WaylineJobDTO.builder()
|
.workspaceId(waylineJob.getWorkspaceId())
|
.jobId(waylineJob.getJobId())
|
.executeTime(LocalDateTime.now())
|
.status(WaylineJobStatusEnum.FAILED.getVal())
|
.completedTime(LocalDateTime.now())
|
.code(serviceReply.getData().getResult().getCode()).build());
|
return false;
|
}
|
return true;
|
}
|
|
|
@Override
|
public Boolean executeFlightTask(String workspaceId, String jobId) {
|
// get job
|
Optional<WaylineJobDTO> waylineJob = waylineJobService.getJobByJobId(workspaceId, jobId);
|
if (waylineJob.isEmpty()) {
|
throw new IllegalArgumentException("Job doesn't exist.");
|
}
|
|
boolean isOnline = deviceRedisService.checkDeviceOnline(waylineJob.get().getDockSn());
|
if (!isOnline) {
|
throw new RuntimeException("Dock is offline.");
|
}
|
|
WaylineJobDTO job = waylineJob.get();
|
|
TopicServicesResponse<ServicesReplyData> serviceReply = abstractWaylineService.flighttaskExecute(
|
SDKManager.getDeviceSDK(job.getDockSn()), new FlighttaskExecuteRequest().setFlightId(jobId),"");
|
if (!serviceReply.getData().getResult().isSuccess()) {
|
log.info("Execute job ====> Error: {}", serviceReply.getData().getResult());
|
waylineJobService.updateJob(WaylineJobDTO.builder()
|
.jobId(jobId)
|
.executeTime(LocalDateTime.now())
|
.status(WaylineJobStatusEnum.FAILED.getVal())
|
.completedTime(LocalDateTime.now())
|
.code(serviceReply.getData().getResult().getCode()).build());
|
// The conditional task fails and enters the blocking status.
|
if (TaskTypeEnum.CONDITIONAL == job.getTaskType()
|
&& WaylineErrorCodeEnum.find(serviceReply.getData().getResult().getCode()).isBlock()) {
|
waylineRedisService.setBlockedWaylineJob(job.getDockSn(), jobId);
|
}
|
return false;
|
}
|
|
waylineJobService.updateJob(WaylineJobDTO.builder()
|
.jobId(jobId)
|
.executeTime(LocalDateTime.now())
|
.status(WaylineJobStatusEnum.IN_PROGRESS.getVal())
|
.build());
|
waylineRedisService.setRunningWaylineJob(job.getDockSn(), EventsReceiver.<FlighttaskProgress>builder().bid(jobId).sn(job.getDockSn()).build());
|
return true;
|
}
|
|
@Override
|
public void cancelFlightTask(String workspaceId, Collection<String> jobIds) {
|
List<WaylineJobDTO> waylineJobs = waylineJobService.getJobsByConditions(workspaceId, jobIds, WaylineJobStatusEnum.PENDING);
|
|
Set<String> waylineJobIds = waylineJobs.stream().map(WaylineJobDTO::getJobId).collect(Collectors.toSet());
|
// Check if the task status is correct.
|
boolean isErr = !jobIds.removeAll(waylineJobIds) || !jobIds.isEmpty() ;
|
if (isErr) {
|
throw new IllegalArgumentException("These tasks have an incorrect status and cannot be canceled. " + Arrays.toString(jobIds.toArray()));
|
}
|
|
// Group job id by dock sn.
|
Map<String, List<String>> dockJobs = waylineJobs.stream()
|
.collect(Collectors.groupingBy(WaylineJobDTO::getDockSn,
|
Collectors.mapping(WaylineJobDTO::getJobId, Collectors.toList())));
|
dockJobs.forEach((dockSn, idList) -> this.publishCancelTask(workspaceId, dockSn, idList));
|
|
}
|
|
public void publishCancelTask(String workspaceId, String dockSn, List<String> jobIds) {
|
boolean isOnline = deviceRedisService.checkDeviceOnline(dockSn);
|
if (!isOnline) {
|
throw new RuntimeException("Dock is offline.");
|
}
|
|
TopicServicesResponse<ServicesReplyData> serviceReply = abstractWaylineService.flighttaskUndo(SDKManager.getDeviceSDK(dockSn),
|
new FlighttaskUndoRequest().setFlightIds(jobIds));
|
if (!serviceReply.getData().getResult().isSuccess()) {
|
log.info("Cancel job ====> Error: {}", serviceReply.getData().getResult());
|
throw new RuntimeException("Failed to cancel the wayline job of " + dockSn);
|
}
|
|
for (String jobId : jobIds) {
|
waylineJobService.updateJob(WaylineJobDTO.builder()
|
.workspaceId(workspaceId)
|
.jobId(jobId)
|
.status(WaylineJobStatusEnum.CANCEL.getVal())
|
.completedTime(LocalDateTime.now())
|
.build());
|
RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, workspaceId + RedisConst.DELIMITER + dockSn + RedisConst.DELIMITER + jobId);
|
}
|
|
}
|
|
@Override
|
public void uploadMediaHighestPriority(String workspaceId, String jobId) {
|
Optional<WaylineJobDTO> jobOpt = waylineJobService.getJobByJobId(workspaceId, jobId);
|
if (jobOpt.isEmpty()) {
|
throw new RuntimeException(CommonErrorEnum.ILLEGAL_ARGUMENT.getMessage());
|
}
|
|
String dockSn = jobOpt.get().getDockSn();
|
String key = RedisConst.MEDIA_HIGHEST_PRIORITY_PREFIX + dockSn;
|
if (RedisOpsUtils.checkExist(key) && jobId.equals(((MediaFileCountDTO) RedisOpsUtils.get(key)).getJobId())) {
|
return;
|
}
|
|
TopicServicesResponse<ServicesReplyData> reply = abstractMediaService.uploadFlighttaskMediaPrioritize(
|
SDKManager.getDeviceSDK(dockSn), new UploadFlighttaskMediaPrioritize().setFlightId(jobId));
|
if (!reply.getData().getResult().isSuccess()) {
|
throw new RuntimeException("Failed to set media job upload priority. Error: " + reply.getData().getResult());
|
}
|
}
|
|
@Override
|
public void updateJobStatus(String workspaceId, String jobId, UpdateJobParam param) {
|
Optional<WaylineJobDTO> waylineJobOpt = waylineJobService.getJobByJobId(workspaceId, jobId);
|
if (waylineJobOpt.isEmpty()) {
|
throw new RuntimeException("The job does not exist.");
|
}
|
WaylineJobDTO waylineJob = waylineJobOpt.get();
|
WaylineJobStatusEnum statusEnum = waylineJobService.getWaylineState(waylineJob.getDockSn());
|
if (statusEnum.getEnd() || WaylineJobStatusEnum.PENDING == statusEnum) {
|
throw new RuntimeException("The wayline job status does not match, and the operation cannot be performed.");
|
}
|
|
switch (param.getStatus()) {
|
case PAUSE:
|
pauseJob(workspaceId, waylineJob.getDockSn(), jobId, statusEnum);
|
break;
|
case RESUME:
|
resumeJob(workspaceId, waylineJob.getDockSn(), jobId, statusEnum);
|
break;
|
}
|
|
}
|
|
private void pauseJob(String workspaceId, String dockSn, String jobId, WaylineJobStatusEnum statusEnum) {
|
if (WaylineJobStatusEnum.PAUSED == statusEnum && jobId.equals(waylineRedisService.getPausedWaylineJobId(dockSn))) {
|
waylineRedisService.setPausedWaylineJob(dockSn, jobId);
|
return;
|
}
|
|
TopicServicesResponse<ServicesReplyData> reply = abstractWaylineService.flighttaskPause(SDKManager.getDeviceSDK(dockSn));
|
if (!reply.getData().getResult().isSuccess()) {
|
throw new RuntimeException("Failed to pause wayline job. Error: " + reply.getData().getResult());
|
}
|
waylineRedisService.delRunningWaylineJob(dockSn);
|
waylineRedisService.setPausedWaylineJob(dockSn, jobId);
|
}
|
|
private void resumeJob(String workspaceId, String dockSn, String jobId, WaylineJobStatusEnum statusEnum) {
|
Optional<EventsReceiver<FlighttaskProgress>> runningDataOpt = waylineRedisService.getRunningWaylineJob(dockSn);
|
if (WaylineJobStatusEnum.IN_PROGRESS == statusEnum && jobId.equals(runningDataOpt.map(EventsReceiver::getSn).get())) {
|
waylineRedisService.setRunningWaylineJob(dockSn, runningDataOpt.get());
|
return;
|
}
|
TopicServicesResponse<ServicesReplyData> reply = abstractWaylineService.flighttaskRecovery(SDKManager.getDeviceSDK(dockSn));
|
if (!reply.getData().getResult().isSuccess()) {
|
throw new RuntimeException("Failed to resume wayline job. Error: " + reply.getData().getResult());
|
}
|
|
runningDataOpt.ifPresent(runningData -> waylineRedisService.setRunningWaylineJob(dockSn, runningData));
|
waylineRedisService.delPausedWaylineJob(dockSn);
|
}
|
|
@Override
|
public void retryPrepareJob(ConditionalWaylineJobKey jobKey, WaylineJobDTO waylineJob) {
|
Optional<WaylineJobDTO> childJobOpt = waylineJobService.createWaylineJobByParent(jobKey.getWorkspaceId(), jobKey.getJobId());
|
if (childJobOpt.isEmpty()) {
|
log.error("Failed to create wayline job.");
|
return;
|
}
|
|
WaylineJobDTO newJob = childJobOpt.get();
|
newJob.setBeginTime(LocalDateTime.now().plusSeconds(RedisConst.WAYLINE_JOB_BLOCK_TIME));
|
boolean isAdd = waylineRedisService.addPrepareConditionalWaylineJob(newJob);
|
if (!isAdd) {
|
log.error("Failed to create wayline job. {}", newJob.getJobId());
|
return;
|
}
|
|
waylineJob.setJobId(newJob.getJobId());
|
waylineRedisService.setConditionalWaylineJob(waylineJob);
|
}
|
|
|
@Override
|
public TopicEventsResponse<MqttReply> flighttaskReady(TopicEventsRequest<FlighttaskReady> request, MessageHeaders headers) {
|
|
WebSocketServerPlayBack.sendInfo(request.toString());
|
// List<String> flightIds = response.getData().getFlightIds();
|
//
|
// log.info("ready task list:{}", Arrays.toString(flightIds.toArray()) );
|
// // Check conditional task blocking status.
|
// String blockedId = waylineRedisService.getBlockedWaylineJobId(response.getGateway());
|
// if (!StringUtils.hasText(blockedId)) {
|
// return null;
|
// }
|
//
|
// Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(response.getGateway());
|
// if (deviceOpt.isEmpty()) {
|
// return null;
|
// }
|
// DeviceDTO device = deviceOpt.get();
|
//
|
// try {
|
// for (String jobId : flightIds) {
|
// boolean isExecute = this.executeFlightTask(device.getWorkspaceId(), jobId);
|
// if (!isExecute) {
|
// return null;
|
// }
|
// Optional<WaylineJobDTO> waylineJobOpt = waylineRedisService.getConditionalWaylineJob(jobId);
|
// if (waylineJobOpt.isEmpty()) {
|
// log.info("The conditional job has expired and will no longer be executed.");
|
// return new TopicEventsResponse<>();
|
// }
|
// WaylineJobDTO waylineJob = waylineJobOpt.get();
|
// this.retryPrepareJob(new ConditionalWaylineJobKey(device.getWorkspaceId(), response.getGateway(), jobId), waylineJob);
|
// return new TopicEventsResponse<>();
|
// }
|
// } catch (Exception e) {
|
// log.error("Failed to execute conditional task.");
|
// e.printStackTrace();
|
// }
|
return new TopicEventsResponse<>();
|
}
|
|
|
@Override
|
public TopicEventsResponse<MqttReply> returnHomeInfo(TopicRequestsRequest<ReturnHomeInfo> request, MessageHeaders headers) {
|
|
WebSocketServerPlayBack.sendInfo(request.toString());
|
return new TopicEventsResponse<>();
|
}
|
|
}
|