package com.bcxin.ferry.service; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.io.FileUtil; import cn.hutool.crypto.digest.MD5; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.bcxin.ferry.common.CommonConstant; import com.bcxin.ferry.common.emus.FerryReceiveTaskStatusEnum; import com.bcxin.ferry.common.emus.FileTypeEnum; import com.bcxin.ferry.common.emus.SendBoundaryUrlEnum; import com.bcxin.ferry.common.utils.Base64Util; import com.bcxin.ferry.common.utils.DateUtil; import com.bcxin.ferry.common.utils.FileSpiltMergeUtil; import com.bcxin.ferry.common.utils.IdGeneratorSnowflake; import com.bcxin.ferry.configs.ReceiveConfig; import com.bcxin.ferry.configs.SchedulingConfig; import com.bcxin.ferry.dao.mapper.FerryReceiveTaskMapper; import com.bcxin.ferry.dtos.FerryFileDto; import com.bcxin.ferry.dtos.FerryTaskInfoDto; import com.bcxin.ferry.dtos.baiduutil.FerryReceiveTaskPushResult; import com.bcxin.ferry.entity.FerryReceiveTaskEntity; import com.bcxin.ferry.entity.FerryReceiveTaskFileEntity; import com.bcxin.ferry.entity.FerryTaskFileEntity; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.io.File; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** * 接收任务表(ferry_receive_task)服务实现类 * * @author : linchunpeng * @date : 2024-3-6 */ @Slf4j @Service public class FerryReceiveTaskService extends ServiceImpl { @Autowired private SchedulingConfig schedulingConfig; @Autowired private ReceiveConfig receiveConfig; @Autowired private FerryReceiveTaskFileService ferryReceiveTaskFileService; @Autowired private IdGeneratorSnowflake snowflake; @Autowired private RetryService retryService; @Autowired private RedissonClient redissonClient; /** * description:接收摆渡任务文件 * author:linchunpeng * date:2024/3/14 */ @Transactional public String receiveRequest(FerryFileDto receiveDto) { log.info("接收摆渡任务文件"); LambdaQueryChainWrapper lqw = this.lambdaQuery(); lqw.eq(FerryReceiveTaskEntity::getFerryTaskId, Long.parseLong(receiveDto.getFerryTaskId())); if (lqw.exists()) { log.info("该任务文件已经接收了"); throw new RuntimeException("该任务文件已经接收了"); } log.info("创建接收任务"); FerryReceiveTaskEntity receiveTaskEntity = new FerryReceiveTaskEntity(); receiveTaskEntity.setId(snowflake.snowflakeId()); receiveTaskEntity.setFerryTaskId(Long.parseLong(receiveDto.getFerryTaskId())); receiveTaskEntity.setRequestId(receiveDto.getRequestId()); receiveTaskEntity.setPackageUrl(receiveConfig.getPackageUrl().concat(receiveDto.getRequestId())); receiveTaskEntity.setTaskStatus(FerryReceiveTaskStatusEnum.RECEIVE_REQUEST.getCode()); receiveTaskEntity.setCreateTime(new Date()); receiveTaskEntity.setUpdateTime(new Date()); this.save(receiveTaskEntity); return receiveTaskEntity.getPackageUrl(); } /** * description:解析任务文件 * author:linchunpeng * date:2024/3/14 */ @Transactional public List parseTaskFile(FerryFileDto receiveDto) { log.info("解析任务文件"); LambdaQueryChainWrapper lqw = this.lambdaQuery(); lqw.eq(FerryReceiveTaskEntity::getFerryTaskId, Long.parseLong(receiveDto.getFerryTaskId())); List list = lqw.list(); FerryReceiveTaskEntity receiveTaskEntity = CollectionUtil.isNotEmpty(list) ? list.get(0) : null; if (receiveTaskEntity != null) { //取出任务文件 log.info("取出任务文件"); File taskFile = Base64Util.base64ToFile(receiveDto.getImg_base64(), receiveTaskEntity.getPackageUrl().concat(File.separator) .concat(CommonConstant.FERRY_TASK_FILE_PREFIX).concat(receiveDto.getFerryTaskId()).concat(".ferry"), true); log.info("任务文件:{}", taskFile.getName()); String fileContent = FileUtil.readUtf8String(taskFile); FerryTaskInfoDto ferryTaskInfoDto = JSONObject.parseObject(fileContent, FerryTaskInfoDto.class); // log.info("任务文件内容:{}", fileContent); receiveTaskEntity.setFileTotalCount(CollectionUtil.isNotEmpty(ferryTaskInfoDto.getFerryTaskFileEntityList()) ? ferryTaskInfoDto.getFerryTaskFileEntityList().size() : 0); receiveTaskEntity.setFileReceiveCount(1); receiveTaskEntity.setTaskStatus(FerryReceiveTaskStatusEnum.PARSE_TASK_FILE.getCode()); receiveTaskEntity.setUpdateTime(new Date()); this.updateById(receiveTaskEntity); return ferryTaskInfoDto.getFerryTaskFileEntityList(); } return null; } /** * description:初始化任务信息 * author:linchunpeng * date:2024/3/14 */ @Transactional public void initTaskInfo(FerryFileDto receiveDto, List ferryTaskFileEntityList, String packageUrl) { log.info("初始化任务信息"); LambdaQueryChainWrapper lqw = this.lambdaQuery(); lqw.eq(FerryReceiveTaskEntity::getFerryTaskId, Long.parseLong(receiveDto.getFerryTaskId())); List list = lqw.list(); FerryReceiveTaskEntity receiveTaskEntity = CollectionUtil.isNotEmpty(list) ? list.get(0) : null; if (receiveTaskEntity != null && CollectionUtil.isNotEmpty(ferryTaskFileEntityList)) { List ferryReceiveTaskFileEntityList = new ArrayList<>(); for (FerryTaskFileEntity fileEntity : ferryTaskFileEntityList) { FerryReceiveTaskFileEntity receiveTaskFileEntity = ferryReceiveTaskFileService.createFerryReceiveTaskFile(receiveTaskEntity, fileEntity); if (receiveTaskFileEntity.getId() == Long.parseLong(receiveDto.getFileId())) { log.info("任务文件,直接设置接收完成"); receiveTaskFileEntity.setReceiveFileMd5(MD5.create().digestHex16(new File(packageUrl.concat(File.separator) .concat(CommonConstant.FERRY_TASK_FILE_PREFIX).concat(receiveDto.getFerryTaskId()).concat(".ferry")))); receiveTaskFileEntity.setReceiveResult("接收完成"); receiveTaskFileEntity.setFileStatus(2); } ferryReceiveTaskFileEntityList.add(receiveTaskFileEntity); } log.info("本次摆渡总文件数量:{}", ferryReceiveTaskFileEntityList.size()); ferryReceiveTaskFileService.saveBatch(ferryReceiveTaskFileEntityList); log.info("本次摆渡总文件都保存入库完成"); receiveTaskEntity.setTaskStatus(FerryReceiveTaskStatusEnum.INIT_TASK_INFO.getCode()); receiveTaskEntity.setUpdateTime(new Date()); this.updateById(receiveTaskEntity); log.info("修改接收任务状态为:{}", FerryReceiveTaskStatusEnum.INIT_TASK_INFO.getCode()); } } /** * description:任务文件回调请求端 * author:linchunpeng * date:2024/3/14 */ @Transactional public void taskFileCallback(FerryFileDto receiveDto) { log.info("任务文件回调请求端"); LambdaQueryChainWrapper lqw = this.lambdaQuery(); lqw.eq(FerryReceiveTaskEntity::getFerryTaskId, Long.parseLong(receiveDto.getFerryTaskId())); List list = lqw.list(); FerryReceiveTaskEntity receiveTaskEntity = CollectionUtil.isNotEmpty(list) ? list.get(0) : null; if (receiveTaskEntity != null && receiveTaskEntity.getTaskStatus() == FerryReceiveTaskStatusEnum.INIT_TASK_INFO.getCode()) { //回调 ferryReceiveTaskFileService.callbackToBoundary(Long.parseLong(receiveDto.getFileId()), SendBoundaryUrlEnum.SEND_TASK_FILE_CALLBACK); receiveTaskEntity.setTaskStatus(FerryReceiveTaskStatusEnum.TASK_FILE_CALLBACK.getCode()); receiveTaskEntity.setUpdateTime(new Date()); this.updateById(receiveTaskEntity); } } /** * description:修改接收任务状态-接收明细文件开始 * author:linchunpeng * date:2024/3/15 */ @Transactional public void updateTaskStatusReceiveDetailFileBegin(Long ferryTaskId) { log.info("接收明细文件开始"); LambdaQueryChainWrapper lqw = this.lambdaQuery(); lqw.eq(FerryReceiveTaskEntity::getFerryTaskId, ferryTaskId); List list = lqw.list(); FerryReceiveTaskEntity receiveTaskEntity = CollectionUtil.isNotEmpty(list) ? list.get(0) : null; if (receiveTaskEntity != null && receiveTaskEntity.getTaskStatus() == FerryReceiveTaskStatusEnum.TASK_FILE_CALLBACK.getCode()) { receiveTaskEntity.setTaskStatus(FerryReceiveTaskStatusEnum.RECEIVE_DETAIL_FILE_BEGIN.getCode()); receiveTaskEntity.setUpdateTime(new Date()); this.updateById(receiveTaskEntity); } } /** * description:修改接收任务状态-接收明细文件完成 * author:linchunpeng * date:2024/3/15 */ @Transactional public void updateTaskStatusReceiveDetailFileComplete(Long ferryTaskId) { log.info("接收明细文件完成"); //分布式锁key String lockKey = "REDISSON_LOCK_UPDATE_FERRY_RECEIVE_TASK_STATUS_" + ferryTaskId.toString(); //取锁 log.info("修改接收任务状态-接收明细文件完成, lockKey:{},取锁中.....", lockKey); RLock lock = redissonClient.getLock(lockKey); //加锁,并设置过期时间 300s lock.lock(300, TimeUnit.SECONDS); log.info("取到锁"); try { LambdaQueryChainWrapper lqw = this.lambdaQuery(); lqw.eq(FerryReceiveTaskEntity::getFerryTaskId, ferryTaskId); List list = lqw.list(); FerryReceiveTaskEntity receiveTaskEntity = CollectionUtil.isNotEmpty(list) ? list.get(0) : null; if (receiveTaskEntity != null) { receiveTaskEntity.setFileReceiveCount(receiveTaskEntity.getFileReceiveCount()+1); //判断是不是所有文件都摆渡完成 long count = ferryReceiveTaskFileService.countNotCompleteByTaskId(receiveTaskEntity.getId()); if (count == 0) { //都已完成摆渡 log.info("明细文件都已完成接收,修改接收任务状态"); receiveTaskEntity.setTaskStatus(FerryReceiveTaskStatusEnum.RECEIVE_DETAIL_FILE_COMPLETE.getCode()); } receiveTaskEntity.setUpdateTime(new Date()); this.updateById(receiveTaskEntity); } } catch (Exception e) { e.printStackTrace(); log.error("修改接收任务状态-接收明细文件完成异常,{}", e.getMessage(), e); } finally { if (lock.isLocked()) { lock.unlock(); } log.info("修改接收任务状态-接收明细文件完成,lockKey:{},解锁", lockKey); } } /** * description:查询是否有接收明细文件完成的任务 * author:linchunpeng * date:2024/3/15 */ @Transactional public FerryReceiveTaskEntity queryReceiveDetailFileComplete() { log.info("查询是否有接收明细文件完成的任务"); LambdaQueryChainWrapper lqw = this.lambdaQuery(); lqw.eq(FerryReceiveTaskEntity::getTaskStatus, FerryReceiveTaskStatusEnum.RECEIVE_DETAIL_FILE_COMPLETE.getCode()); List list = lqw.list(); FerryReceiveTaskEntity receiveTaskEntity = CollectionUtil.isNotEmpty(list) ? list.get(0) : null; //判断是不是所有文件都摆渡完成 if (receiveTaskEntity != null) { log.info("修改接收任务状态为:合并文件开始"); receiveTaskEntity.setTaskStatus(FerryReceiveTaskStatusEnum.MERGE_FILE_BEGIN.getCode()); receiveTaskEntity.setUpdateTime(new Date()); this.updateById(receiveTaskEntity); return receiveTaskEntity; } return null; } /** * description:遍历明细文件列表,合并文件 * author:linchunpeng * date:2024/3/15 */ @Transactional public void mergeFile(FerryReceiveTaskEntity receiveTaskEntity) { log.info("遍历明细文件列表,合并文件"); //先查拆分文件列表 List fileEntityList = ferryReceiveTaskFileService.queryFerryCompleteList(receiveTaskEntity.getId()); if (CollectionUtil.isNotEmpty(fileEntityList)) { log.info("需要合并的文件数量:{}", fileEntityList.size()); for (FerryReceiveTaskFileEntity fileEntity : fileEntityList) { //取出子文件ids log.info("文件id:{},子文件id:{}", fileEntity.getId(), fileEntity.getSplitFileIds()); if (StringUtils.isNotBlank(fileEntity.getSplitFileIds())) { String[] fileId = fileEntity.getSplitFileIds().split(","); List fileIds = Arrays.stream(fileId).map(Long::parseLong).collect(Collectors.toList()); //查询出子文件列表 List childrenList = ferryReceiveTaskFileService.queryByIds(fileIds); if (CollectionUtil.isNotEmpty(childrenList)) { List fileList = childrenList.stream().map(childrenFile -> new File(receiveConfig.getPackageUrl().concat(childrenFile.getFileUrl()))).collect(Collectors.toList()); File parentFile = new File(receiveConfig.getPackageUrl().concat(fileEntity.getFileUrl())); long fileSize = FileSpiltMergeUtil.join(fileList.toArray(new File[0]), parentFile); if (fileSize > 0) { log.info("文件id:{},合并成功", fileEntity.getId()); fileEntity.setMergeStatus(2); } else { fileEntity.setReceiveResult("合并失败"); log.info("文件id:{},合并失败", fileEntity.getId()); } fileEntity.setUpdateTime(new Date()); for (File file : fileList) { if (file.exists()) { file.delete(); } } } } } ferryReceiveTaskFileService.updateBatchById(fileEntityList); } } /** * description:修改接收任务状态-合并文件完成 * author:linchunpeng * date:2024/3/15 */ @Transactional public void updateTaskStatusMergeFileComplete(Long id) { log.info("修改接收任务状态-合并文件完成"); FerryReceiveTaskEntity receiveTaskEntity = this.getById(id); if (receiveTaskEntity != null) { receiveTaskEntity.setTaskStatus(FerryReceiveTaskStatusEnum.MERGE_FILE_COMPLETE.getCode()); receiveTaskEntity.setUpdateTime(new Date()); this.updateById(receiveTaskEntity); } } /** * description:调用摆渡工具服务,推送数据 * author:linchunpeng * date:2024/3/15 */ @Transactional public void pushData(Long id) { log.info("调用摆渡工具服务,推送数据"); FerryReceiveTaskEntity receiveTaskEntity = this.getById(id); if (receiveTaskEntity != null) { //调用摆渡工具服务,推送数据 retryService.postToBaiduutilServerPush(receiveTaskEntity); receiveTaskEntity.setTaskStatus(FerryReceiveTaskStatusEnum.PUSH_DATA.getCode()); receiveTaskEntity.setUpdateTime(new Date()); this.updateById(receiveTaskEntity); } } /** * description:查询是否有摆渡完成的任务 * author:linchunpeng * date:2024/3/15 */ @Transactional public void queryReceiveTaskIsComplete() { LambdaQueryChainWrapper lqw = this.lambdaQuery(); lqw.eq(FerryReceiveTaskEntity::getTaskStatus, FerryReceiveTaskStatusEnum.PUSH_DATA.getCode()); List list = lqw.list(); FerryReceiveTaskEntity receiveTaskEntity = CollectionUtil.isNotEmpty(list) ? list.get(0) : null; log.info(receiveTaskEntity == null ? "没有最近一次执行推送操作的任务" : "最近一次执行推送操作的任务数据为:" + JSONObject.toJSONString(receiveTaskEntity)); if (receiveTaskEntity != null) { FerryReceiveTaskPushResult result = retryService.postToBaiduutilServerPushResult(receiveTaskEntity); if (result != null && result.getTaskStatus() != null) { receiveTaskEntity.setReceiveResult(result.getReceiveResult()); receiveTaskEntity.setTaskStatus(result.getTaskStatus()); receiveTaskEntity.setUpdateTime(new Date()); this.updateById(receiveTaskEntity); } } } /** * description:查询是否有摆渡完成的任务 * author:linchunpeng * date:2024/3/15 */ @Transactional public FerryReceiveTaskEntity queryReceiveTaskComplete() { log.info("查询是否有摆渡完成的任务"); LambdaQueryChainWrapper lqw = this.lambdaQuery(); lqw.eq(FerryReceiveTaskEntity::getTaskStatus, FerryReceiveTaskStatusEnum.FERRY_COMPLETE.getCode()); List list = lqw.list(); return CollectionUtil.isNotEmpty(list) ? list.get(0) : null; } /** * description:查询大于一小时还未摆渡完成的接收任务 * author:linchunpeng * date:2024/3/20 */ public List queryOneHourNotCompleteList() { //大于一小时还未摆渡完成的接收任务 Date intervalTime = DateUtil.getBeforeNumMinuteTime(new Date(), schedulingConfig.getScanErrorInfo().getTimeInterval()); LambdaQueryChainWrapper lqw = this.lambdaQuery(); lqw.ne(FerryReceiveTaskEntity::getTaskStatus, FerryReceiveTaskStatusEnum.FERRY_COMPLETE.getCode()); lqw.lt(FerryReceiveTaskEntity::getCreateTime, intervalTime); lqw.orderByAsc(FerryReceiveTaskEntity::getCreateTime); return lqw.list(); } /** * description:上个任务完成时间是否大于一个半小时还未生成新任务? * author:linchunpeng * date:2024/3/20 */ public boolean isNotCreateNewTask() { //最后一个摆渡完成的任务 LambdaQueryChainWrapper lqw = this.lambdaQuery(); lqw.eq(FerryReceiveTaskEntity::getTaskStatus, FerryReceiveTaskStatusEnum.FERRY_COMPLETE.getCode()); lqw.orderByDesc(FerryReceiveTaskEntity::getCreateTime); lqw.last("limit 1"); List list = lqw.list(); FerryReceiveTaskEntity lastComplete = CollectionUtil.isNotEmpty(list) ? list.get(0) : null; if (lastComplete != null && lastComplete.getUpdateTime().getTime() > (90*60+1000)) { //最后完成的任务 && 最后修改时间 > 90分钟 LambdaQueryChainWrapper lqw2 = this.lambdaQuery(); lqw2.orderByDesc(FerryReceiveTaskEntity::getCreateTime); List list2 = lqw2.list(); FerryReceiveTaskEntity lastTask = CollectionUtil.isNotEmpty(list2) ? list2.get(0) : null; if (lastTask != null && lastComplete.getId().longValue() == lastTask.getId().longValue()) { //同一个 return true; } else { return false; } } return true; } }