package com.bcxin.ferry.service; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.io.FileUtil; import cn.hutool.core.util.ZipUtil; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.bcxin.ferry.common.CommonConstant; import com.bcxin.ferry.common.emus.FerryTaskStatusEnum; import com.bcxin.ferry.common.utils.DateUtil; import com.bcxin.ferry.common.utils.IdGeneratorSnowflake; import com.bcxin.ferry.common.utils.ObsUtil; import com.bcxin.ferry.configs.BaiduutilServerConfig; import com.bcxin.ferry.configs.SchedulingConfig; import com.bcxin.ferry.dao.mapper.FerryTaskMapper; import com.bcxin.ferry.dtos.FerryDto; import com.bcxin.ferry.dtos.FerryFileCallbackDto; import com.bcxin.ferry.dtos.FerryTaskInfoDto; import com.bcxin.ferry.dtos.baiduutil.FerryTaskPullResult; import com.bcxin.ferry.entity.FerryTaskEntity; import com.bcxin.ferry.entity.FerryTaskFileEntity; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.io.File; import java.util.Date; import java.util.List; import java.util.concurrent.TimeUnit; /** * 摆渡任务表(ferry_task)服务实现类 * @author : linchunpeng * @date : 2024-3-6 */ @Slf4j @Service public class FerryTaskService extends ServiceImpl { @Autowired private SchedulingConfig schedulingConfig; @Autowired private RetryService retryService; @Autowired private FerryTaskFileService ferryTaskFileService; @Autowired private IdGeneratorSnowflake snowflake; @Autowired private RedissonClient redissonClient; @Value("${spring.profiles.active}") private String activeFile; @Autowired private BaiduutilServerConfig baiduutilServerConfig; /** * description:创建摆渡任务 * author:linchunpeng * date:2024/3/11 */ @Transactional public void createFerryTask() { log.info("开始创建摆渡任务"); Date nowTime = new Date(); //取出最后一次摆渡任务 LambdaQueryChainWrapper lqw = this.lambdaQuery(); lqw.orderByDesc(FerryTaskEntity::getCreateTime); lqw.last("limit 1"); List list = lqw.list(); FerryTaskEntity lastTask = CollectionUtil.isNotEmpty(list) ? list.get(0) : null; log.info(lastTask == null ? "没有最近一次任务" : "最近一次的任务数据为:" + JSONObject.toJSONString(lastTask)); //判断上次任务已完成 if (lastTask != null && lastTask.getTaskStatus() != FerryTaskStatusEnum.FERRY_TASK_COMPLETE.getCode()) { //最近一次摆渡任务未完成 log.info("最近一次摆渡任务未完成"); //2个小时毫秒数 long intervalTime = 1000 * 60 * 60 * 2; if (nowTime.getTime() - lastTask.getCreateTime().getTime() > intervalTime) { //最近一次任务的创建时间>2小时 log.info("最近一次任务的创建时间>2小时"); //预警 } } else { //最近一次摆渡任务已完成 log.info("最近一次摆渡任务已完成 || 没有最近一次任务"); Date pullDataBeginTime = DateUtil.getMinuteZeroTime(new Date()); if (lastTask != null) { pullDataBeginTime = lastTask.getEndTime(); } Date pullDataEndTime = DateUtil.getAfterNumMinuteTime(pullDataBeginTime, schedulingConfig.getCreateFerryTask().getTimeInterval()); log.info("新的任务拉取数据开始时间:{},拉取数据开始时间:{}", DateUtil.formatDateTime(pullDataBeginTime), DateUtil.formatDateTime(pullDataEndTime)); if (pullDataEndTime.getTime() >= nowTime.getTime()) { //新任务的结束时间 >= 当前时间,无法创建任务,等待下次定时任务启动 log.info("新任务的结束时间 >= 当前时间,无法创建任务,等待下次定时任务启动"); } else { //新任务的结束时间 < 当前时间 log.info("新任务的结束时间 < 当前时间,可以创建新的任务"); FerryTaskEntity ferryTaskEntity = new FerryTaskEntity(); ferryTaskEntity.setId(snowflake.snowflakeId()); ferryTaskEntity.setRegionCode(baiduutilServerConfig.getRegionCodes()); ferryTaskEntity.setStartTime(pullDataBeginTime); ferryTaskEntity.setEndTime(pullDataEndTime); ferryTaskEntity.setTaskStatus(FerryTaskStatusEnum.BEGIN_PULL.getCode()); ferryTaskEntity.setCreateTime(nowTime); ferryTaskEntity.setUpdateTime(nowTime); this.save(ferryTaskEntity); //调用摆渡工具服务,拉取数据 retryService.postToBaiduutilServerPull(ferryTaskEntity); } } } /** * description:查询摆渡任务状态-拉取完成 * author:linchunpeng * date:2024/3/11 */ @Transactional public void queryPullIsComplete() { Date nowTime = new Date(); //查询最近一条,拉取数据成功的摆渡任务 LambdaQueryChainWrapper lqw = this.lambdaQuery(); lqw.eq(FerryTaskEntity::getTaskStatus, FerryTaskStatusEnum.BEGIN_PULL.getCode()); lqw.isNull(FerryTaskEntity::getRequestId); lqw.isNull(FerryTaskEntity::getPackageUrl); lqw.orderByDesc(FerryTaskEntity::getCreateTime); lqw.last("limit 1"); List list = lqw.list(); FerryTaskEntity lastTask = CollectionUtil.isNotEmpty(list) ? list.get(0) : null; log.info(lastTask == null ? "没有最近一次开始拉取的任务" : "最近一次开始拉取的任务数据为:" + JSONObject.toJSONString(lastTask)); long intervalTime = 1000 * 60 * 60 * 5; if (lastTask != null && nowTime.getTime() - lastTask.getCreateTime().getTime() < intervalTime) { //开始拉取完成的任务不为空,且创建时间小于5个小时,可以开始查询拉取状态 //调用摆渡工具服务,拉取数据 FerryTaskPullResult result = retryService.postToBaiduutilServerPullResult(lastTask); //结果 if (result.getTaskStatus() != null) { FerryTaskEntity ferryTask = this.getById(result.getId()); ferryTask.setRequestId(result.getRequestId()); ferryTask.setPackageUrl(result.getPackageUrl()); ferryTask.setPullResult(result.getPullResult()); ferryTask.setTaskStatus(result.getTaskStatus()); ferryTask.setUpdateTime(new Date()); this.updateById(ferryTask); } } } /** * description:查询摆渡任务状态-拉取完成 * author:linchunpeng * date:2024/3/11 */ @Transactional public void queryPullComplete() { //分布式锁key String lockKey = "REDISSON_LOCK_QUERY_FERRY_TASK_STATUS"; //取锁 log.info("查询摆渡任务状态任务, lockKey:{},取锁中.....", lockKey); RLock lock = redissonClient.getLock(lockKey); //加锁,并设置过期时间 300s lock.lock(900, TimeUnit.SECONDS); log.info("取到锁"); try { Date nowTime = new Date(); log.info("当前时间戳:{}", nowTime.getTime()); //查询最近一条,拉取数据成功的摆渡任务 LambdaQueryChainWrapper lqw = this.lambdaQuery(); lqw.eq(FerryTaskEntity::getTaskStatus, FerryTaskStatusEnum.PULL_COMPLETE.getCode()); lqw.isNotNull(FerryTaskEntity::getRequestId); lqw.isNotNull(FerryTaskEntity::getPackageUrl); lqw.orderByDesc(FerryTaskEntity::getCreateTime); lqw.last("limit 1"); List list = lqw.list(); FerryTaskEntity lastTask = CollectionUtil.isNotEmpty(list) ? list.get(0) : null; log.info(lastTask == null ? "没有最近一次拉取完成的任务" : "最近一次拉取完成的任务数据为:" + JSONObject.toJSONString(lastTask)); long intervalTime = 1000 * 60 * 60 * 2; if (lastTask != null && lastTask.getUpdateTime() != null && nowTime.getTime() - lastTask.getUpdateTime().getTime() < intervalTime) { //拉取完成的任务不为空,且拉取完成时间小于2个小时,可以开始下载文件 boolean unzipResult = true;//除了北京政务外网,其他都是默认解压成功 String ferryPackageUrl = lastTask.getPackageUrl().concat(File.separator).concat(lastTask.getRequestId()); log.info("activeFile:{}", activeFile); if ("out-prod".equals(activeFile)) { log.info("北京政务外网,需要下载zip包"); //北京政务外网,需要下载zip包 String zipPath = ferryPackageUrl.concat(".zip"); int downloadCount = 0 ; boolean downloadFileResult = false; while (!downloadFileResult && downloadCount < 10) { //预防下载失败 downloadFileResult = ObsUtil.downloadFile(zipPath, zipPath.substring(zipPath.indexOf("baidu/"))); downloadCount++; log.info("第{}次下载OBS,下载:{}", downloadCount, downloadFileResult); if (!downloadFileResult) { log.info("下载失败,等待2分钟"); try { Thread.sleep(120000); } catch (InterruptedException e) { e.printStackTrace(); } } } if (downloadFileResult) { //解压zip File zipFile = new File(zipPath); if (!zipFile.exists()) { //zip文件不存在 log.error("zip文件不存在"); unzipResult = false; } //解压文件 File unzip = ZipUtil.unzip(zipFile); if (!unzip.exists()) { //zip解压失败 log.error("zip解压失败"); unzipResult = false; } log.info("解压结束"); } else { log.error("下载摆渡包失败"); } } else { log.info("不是北京政务外网,不需要需要下载zip包"); } if (unzipResult) { //可以开始扫描文件 List fileList = FileUtil.loopFiles(ferryPackageUrl); if (CollectionUtil.isNotEmpty(fileList)) { log.info("开始初始化文件"); for (File file : fileList) { if (file.isHidden()) { continue; } ferryTaskFileService.createFerryTaskFile(lastTask, file); } lastTask.setTaskStatus(FerryTaskStatusEnum.SCAN_FILE_COMPLETE.getCode()); log.info("初始化文件结束"); log.info("开始生成摆渡任务信息文件"); this.createFerryTaskInfoFile(lastTask); lastTask.setTaskStatus(FerryTaskStatusEnum.CREATE_TASK_FILE_COMPLETE.getCode()); lastTask.setUpdateTime(new Date()); this.updateById(lastTask); log.info("生成摆渡任务信息文件结束"); } } } else { log.info("没有最后一个任务或者超过2个小时"); } } catch (Exception e) { e.printStackTrace(); log.error("查询摆渡任务状态任务异常,{}", e.getMessage(), e); } finally { if (lock.isLocked()) { lock.unlock(); } log.info("查询摆渡任务状态任务,lockKey:{},解锁", lockKey); } } /** * description:生成摆渡任务信息文件 * author:linchunpeng * date:2024/3/12 */ private void createFerryTaskInfoFile(FerryTaskEntity ferryTaskEntity) { String fileName = ferryTaskEntity.getPackageUrl().concat(File.separator).concat(ferryTaskEntity.getRequestId()) .concat(File.separator) .concat(CommonConstant.FERRY_TASK_FILE_PREFIX) .concat(ferryTaskEntity.getId().toString()).concat(".ferry"); File infoFile = new File(fileName); ferryTaskFileService.createFerryTaskFile(ferryTaskEntity, infoFile); List ferryTaskFileEntityList = ferryTaskFileService.queryByTaskId(ferryTaskEntity.getId()); FerryTaskInfoDto dto = new FerryTaskInfoDto(); //存放任务信息 dto.setFerryTaskEntity(ferryTaskEntity); //存放文件信息 dto.setFerryTaskFileEntityList(ferryTaskFileEntityList); FileUtil.appendUtf8String(JSONObject.toJSONString(dto), infoFile); } /** * description:调用边界服务执行摆渡任务信息文件 * author:linchunpeng * date:2024/3/13 */ @Transactional public void sendFerryTaskFile() { //分布式锁key String lockKey = "REDISSON_LOCK_SEND_FERRY_TASK_FILE"; //取锁 log.info("调用边界服务执行摆渡任务信息文件, lockKey:{},取锁中.....", lockKey); RLock lock = redissonClient.getLock(lockKey); //加锁,并设置过期时间 300s lock.lock(300, TimeUnit.SECONDS); log.info("取到锁"); try { Date nowTime = new Date(); //查询最近一条,生成任务文件完成的摆渡任务 LambdaQueryChainWrapper lqw = this.lambdaQuery(); lqw.eq(FerryTaskEntity::getTaskStatus, FerryTaskStatusEnum.CREATE_TASK_FILE_COMPLETE.getCode()); lqw.orderByDesc(FerryTaskEntity::getCreateTime); lqw.last("limit 1"); List list = lqw.list(); FerryTaskEntity lastTask = CollectionUtil.isNotEmpty(list) ? list.get(0) : null; log.info(lastTask == null ? "没有最近一次生成任务文件完成的任务" : "最近一次生成任务文件完成的任务数据为:" + JSONObject.toJSONString(lastTask)); long intervalTime = 1000 * 60 * 60 * 2; if (lastTask != null && lastTask.getUpdateTime() != null && nowTime.getTime() - lastTask.getUpdateTime().getTime() < intervalTime) { //生成任务文件完成的任务不为空,且生成任务文件完成的时间小于2个小时,可以开始摆渡任务文件 log.info("开始摆渡任务信息文件"); ferryTaskFileService.ferryTaskFile(lastTask.getId()); lastTask.setTaskStatus(FerryTaskStatusEnum.FERRY_TASK_FILE_BEGIN.getCode()); lastTask.setUpdateTime(new Date()); this.updateById(lastTask); log.info("开始摆渡任务信息文件结束"); } } catch (Exception e) { e.printStackTrace(); log.error("调用边界服务执行摆渡任务信息文件异常,{}", e.getMessage(), e); } finally { if (lock.isLocked()) { lock.unlock(); } log.info("调用边界服务执行摆渡任务信息文件,lockKey:{},解锁", lockKey); } } /** * description:修改摆渡任务文件结果 * author:linchunpeng * date:2024/3/13 */ @Transactional public Long updateFerryTaskFileResult(FerryFileCallbackDto callbackDto) { FerryTaskFileEntity fileEntity = ferryTaskFileService.getById(Long.parseLong(callbackDto.getFileId())); log.info(fileEntity == null ? "摆渡任务文件不存在" : "摆渡任务文件数据为:" + JSONObject.toJSONString(fileEntity)); if (fileEntity != null) { log.info("修改摆渡任务文件结果"); fileEntity.setFileStatus(callbackDto.getFileStatus()); fileEntity.setFerryResult(callbackDto.getFerryResult()); fileEntity.setUpdateTime(new Date()); ferryTaskFileService.updateById(fileEntity); //修改任务状态 log.info("修改任务状态:{}", FerryTaskStatusEnum.FERRY_TASK_FILE_COMPLETE.getCode()); FerryTaskEntity ferryTaskEntity = this.getById(fileEntity.getTaskId()); ferryTaskEntity.setTaskStatus(FerryTaskStatusEnum.FERRY_TASK_FILE_COMPLETE.getCode()); ferryTaskEntity.setUpdateTime(new Date()); this.updateById(ferryTaskEntity); return ferryTaskEntity.getId(); } return null; } /** * description:摆渡任务文件完成,调用边界服务执行摆渡任务明细文件 * author:linchunpeng * date:2024/3/13 */ @Transactional public void sendFerryDetailFile(Long taskId) { //分布式锁key String lockKey = "REDISSON_LOCK_SEND_FERRY_DETAIL_FILE"; //取锁 log.info("摆渡任务文件完成,调用边界服务执行摆渡任务明细文件, lockKey:{},取锁中.....", lockKey); RLock lock = redissonClient.getLock(lockKey); //加锁,并设置过期时间 300s lock.lock(300, TimeUnit.SECONDS); log.info("取到锁"); try { FerryTaskEntity ferryTaskEntity = this.getById(taskId); if (ferryTaskEntity != null) { //摆渡任务文件完成的任务不为空,可以开始摆渡明细文件 log.info("开始摆渡任务明细文件"); ferryTaskEntity.setTaskStatus(FerryTaskStatusEnum.FERRY_DETAIL_FILE_BEGIN.getCode()); ferryTaskEntity.setUpdateTime(new Date()); this.updateById(ferryTaskEntity); log.info("开始摆渡任务明细文件结束"); } } catch (Exception e) { e.printStackTrace(); log.error("摆渡任务文件完成,调用边界服务执行摆渡任务明细文件异常,{}", e.getMessage(), e); } finally { if (lock.isLocked()) { lock.unlock(); } log.info("摆渡任务文件完成,调用边界服务执行摆渡任务明细文件,lockKey:{},解锁", lockKey); } } /** * description:修改摆渡明细文件结果 * author:linchunpeng * date:2024/3/13 */ @Transactional public Long updateFerryDetailFileResult(FerryFileCallbackDto callbackDto) { FerryTaskFileEntity fileEntity = ferryTaskFileService.getById(Long.parseLong(callbackDto.getFileId())); log.info(fileEntity == null ? "摆渡明细文件不存在" : "摆渡明细文件数据为:" + JSONObject.toJSONString(fileEntity)); if (fileEntity != null) { fileEntity.setFileStatus(callbackDto.getFileStatus()); fileEntity.setFerryResult(callbackDto.getFerryResult()); fileEntity.setUpdateTime(new Date()); ferryTaskFileService.updateById(fileEntity); return fileEntity.getTaskId(); } return null; } /** * description:修改任务状态为:摆渡明细文件结束 * author:linchunpeng * date:2024/3/13 */ @Transactional public void updateFerryDetailFileComplete(Long taskId) { //分布式锁key String lockKey = "REDISSON_LOCK_UPDATE_FERRY_TASK_STATUS_" + taskId.toString(); //取锁 log.info("修改任务状态为:摆渡明细文件结束, lockKey:{},取锁中.....", lockKey); RLock lock = redissonClient.getLock(lockKey); //加锁,并设置过期时间 300s lock.lock(300, TimeUnit.SECONDS); log.info("取到锁"); try { long nullFerryResultCount = ferryTaskFileService.countNullFerryResultByTaskId(taskId); log.info("查询还有多少个明细文件未摆渡完成,nullFerryResultCount:{}", nullFerryResultCount); if (nullFerryResultCount == 0) { log.info("摆渡明细文件都完成了,修改任务状态"); //修改任务状态 FerryTaskEntity ferryTaskEntity = this.getById(taskId); if (ferryTaskEntity.getTaskStatus() == FerryTaskStatusEnum.FERRY_DETAIL_FILE_BEGIN.getCode()) { ferryTaskEntity.setTaskStatus(FerryTaskStatusEnum.FERRY_DETAIL_FILE_COMPLETE.getCode()); ferryTaskEntity.setUpdateTime(new Date()); this.updateById(ferryTaskEntity); } } } catch (Exception e) { e.printStackTrace(); log.error("修改任务状态为:摆渡明细文件结束异常,{}", e.getMessage(), e); } finally { if (lock.isLocked()) { lock.unlock(); } log.info("修改任务状态为:摆渡明细文件结束,lockKey:{},解锁", lockKey); } } /** * description:修改摆渡任务完成 * author:linchunpeng * date:2024/3/13 */ @Transactional public void updateFerryTaskComplete(FerryDto ferryDto) { log.info("修改摆渡任务完成"); FerryTaskEntity ferryTaskEntity = this.getById(Long.parseLong(ferryDto.getFerryTaskId())); if (ferryTaskEntity != null) { ferryTaskEntity.setTaskStatus(FerryTaskStatusEnum.FERRY_TASK_COMPLETE.getCode()); ferryTaskEntity.setFerryResult(ferryDto.getFerryTaskResult()); ferryTaskEntity.setUpdateTime(new Date()); this.updateById(ferryTaskEntity); } } /** * description:查询大于一小时还未摆渡完成的任务 * author:linchunpeng * date:2024/3/20 */ public List queryOneHourNotCompleteList() { //大于一小时还未摆渡完成的任务 Date intervalTime = DateUtil.getBeforeNumMinuteTime(new Date(), schedulingConfig.getScanErrorInfo().getTimeInterval()); LambdaQueryChainWrapper lqw = this.lambdaQuery(); lqw.ne(FerryTaskEntity::getTaskStatus, FerryTaskStatusEnum.FERRY_TASK_COMPLETE.getCode()); lqw.lt(FerryTaskEntity::getCreateTime, intervalTime); lqw.orderByAsc(FerryTaskEntity::getCreateTime); return lqw.list(); } /** * description:上个任务完成时间是否大于一个半小时还未生成新任务? * author:linchunpeng * date:2024/3/20 */ public boolean isNotCreateNewTask() { //最后一个摆渡完成的任务 LambdaQueryChainWrapper lqw = this.lambdaQuery(); lqw.eq(FerryTaskEntity::getTaskStatus, FerryTaskStatusEnum.FERRY_TASK_COMPLETE.getCode()); lqw.orderByDesc(FerryTaskEntity::getCreateTime); lqw.last("limit 1"); List list = lqw.list(); FerryTaskEntity lastComplete = CollectionUtil.isNotEmpty(list) ? list.get(0) : null; if (lastComplete != null && lastComplete.getUpdateTime().getTime() > (90*60+1000)) { //最后完成的任务 && 最后修改时间 > 90分钟 LambdaQueryChainWrapper lqw2 = this.lambdaQuery(); lqw2.orderByDesc(FerryTaskEntity::getCreateTime); List list2 = lqw2.list(); FerryTaskEntity lastTask = CollectionUtil.isNotEmpty(list2) ? list2.get(0) : null; if (lastTask != null && lastComplete.getId().longValue() == lastTask.getId().longValue()) { //同一个 return true; } else { return false; } } return true; } }