package com.jiayue.ipfcst.client.Schedule; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.http.HttpRequest; import cn.hutool.http.HttpResponse; import cn.hutool.http.HttpUtil; import cn.hutool.json.JSONArray; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.jiayue.ipfcst.client.domain.entity.FileCreateLog; import com.jiayue.ipfcst.client.utils.*; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import java.io.*; import java.net.URLDecoder; import java.text.SimpleDateFormat; import java.util.*; import java.util.stream.Collectors; /** * 气象文件下载任务 * * @version 1.0 * @since 2019/4/8 11:24 */ @Component @Configuration @EnableScheduling @Slf4j @EnableAsync public class Task { @Value("${downLoadFile.stationCode}") String applicationStationCode; @Value("${zxglPath}") String zxglPath; /** * 功能:Java读取txt文件的内容 * 步骤:1:先获得文件句柄 * 2:获得文件句柄当做是输入一个字节码流,需要对这个输入流进行读取 * 3:读取到输入流后,需要读取生成字节流 * 4:一行一行的输出。readline()。 * 备注:需要考虑的是异常情况 * * @param filePath */ public static String readTxtFile(String filePath) { String val = ""; try { String encoding = "GBK"; File file = new File(filePath); if (file.isFile() && file.exists()) { InputStreamReader read = new InputStreamReader( new FileInputStream(file), encoding);//考虑到编码格式 BufferedReader bufferedReader = new BufferedReader(read); String lineTxt = null; while ((lineTxt = bufferedReader.readLine()) != null) { val = val + lineTxt; } read.close(); } else { log.error("找不到指定的文件"); } } catch (Exception e) { log.error("读取文件内容出错", e); } return val; } /** * 根据反向隔离传输路径获取备份文件夹 * 默认为new同级的bak路径 * * @param path 反向隔离传输路径 */ public String getFxglModifyPath(String path, String targetPath) { String pathBak = ""; String lastPathStr = ""; if (StrUtil.endWithIgnoreCase(path, "/")) { String[] paths = path.split("/"); lastPathStr = paths[paths.length - 1]; } else { char separatorChar = '/'; lastPathStr = StringUtils.unqualify(path, separatorChar); } if ("new".equals(lastPathStr)) { pathBak = path.replace("new", targetPath).replace("ipp/bak", "ipp/new"); log.info("获取到反向隔离:{} 路径", targetPath); } else { try { throw new Exception("获取反向隔离备份路径失败,传输路径最后一层不为new"); } catch (Exception e) { throw new RuntimeException(e); } } return pathBak; // } } /** * v3 http下载文件请求 */ @Async // @Scheduled(fixedRate = 6000) @Scheduled(initialDelay = 45 * 1000, fixedRateString = "60000") void v3DataFilesDownload() { if ("否".equals(Constant.cacheClientConfig.getDqAndNwpFileTask())) { log.info("气象文件下载任务已关闭"); return; } log.info("气象文件下载任务开始执行"); Constant.filedownLoadList = new ArrayList<>(); String fxglFilePath = Constant.cacheClientConfig.getFxglPath(); log.info("缓存配置中的反向隔离文件发送路径:{}", fxglFilePath); if (StrUtil.isBlank(fxglFilePath)) { log.warn("反向隔离路径未配置,不执行气象文件下载任务。"); return; } String stationCode = Constant.cacheClientConfig.getStationCode(); List reqFilesList = new ArrayList<>(); //当applicationStationCode不等于""时 下载所有配置的场站 if ("".equals(applicationStationCode)) { reqFilesList = queryToDownfilesFromCloud(stationCode); if (stationCode.equals("J00192")) { reqFilesList.addAll(queryToDownfilesFromCloud("J00389")); reqFilesList.addAll(queryToDownfilesFromCloud("J00390")); } } else { String[] codes = applicationStationCode.split(","); for (String code : codes) { reqFilesList.addAll(queryToDownfilesFromCloud(code)); } } // if (reqFilesList.size() == 0) { // log.info("未从云端获得今日未下载的文件信息"); // return; // } //临时文件下载路径 String tempFilePath = checkTempFilePath(fxglFilePath); //遍历下载所有文件 try { for (FileCreateLog fileCreateLog : reqFilesList) { String curentFileName = fileCreateLog.getFileName(); //判断该文件是否已下载 if (!Constant.alreadyDownFilesList.contains(fileCreateLog.getStationCode() + curentFileName)) { //返回信息中有文件从minio中的下载路径,尝试直接下载,如果下载失败再从云端下载 Long size = downloadFileByMinioOrCloud(tempFilePath + File.separatorChar + fileCreateLog.getStationCode(), curentFileName, fileCreateLog.getId(), fileCreateLog.getFileDownloadUrl()); if (size > 0) { String filePath = tempFilePath + File.separatorChar + fileCreateLog.getStationCode() + File.separatorChar + curentFileName; log.info("成功下载文件到:{}", filePath); //文件下载成功缓存文件名称,避免再次下载 , Constant.alreadyDownFilesList.add(fileCreateLog.getStationCode() + curentFileName); //缓存文件信息,做后续回传文件传入内网状态使用 Constant.downLoadFilesStatusList.add(fileCreateLog); // if (curentFileName.contains("pointconfig")) { // //修唯外网处理气象数据配置,及重启client端 // toDealProtocol(filePath, fileCreateLog.getId()); // } else { //缓存文件路径,下载完成后移动文件到反向隔离发送路径 Constant.filedownLoadList.add(filePath); // } } else { log.error("文件:【{}】下载失败", fileCreateLog.getFileName()); } } else { log.info("文件:【{}】已经下载过了,不需要再下次,如有需要请手动下载或联系运维生成新文件。", fileCreateLog.getFileName()); } } } catch (Exception e) { log.error("文件下载失败:{}", e); } finally { //移动文件到相应反向隔离目录,向内网传输 if (Constant.filedownLoadList.size() > 0) { for (String fpath : Constant.filedownLoadList) { int startIndex = fpath.indexOf("J00"); int endIndex = startIndex + 6; String result = fpath.substring(startIndex, endIndex); UtilTools.mvFile(fpath, fxglFilePath + File.separator + result + File.separator + "new"); } } } log.info("气象文件下载任务执行结束"); } /** * 请求云端,获取今日未下载文件列表 * * @return */ private List queryToDownfilesFromCloud(String stationCode) { List reqFilesList = new ArrayList<>(); List dqListNow = new ArrayList<>(); List nwpListNow = new ArrayList<>(); List finalList = new ArrayList<>(); // Map postParms = new HashMap<>(2); // postParms.put("stationCode", stationCode); // postParms.put("sign", Md5Util.makeMd5(stationCode)); // 请求minio String url = "https://117.78.19.70:9010/client/getFileLogsForAio/"; // String body = HttpUtil.post(Constant.cacheClientConfig.getCloudAddr() + "getCurentDayUnDownLoadFile", postParms, 10000); HttpRequest httpRequest = HttpRequest.get(url + stationCode); httpRequest.setGlobalTimeout(20000); String body = httpRequest.execute().body(); JSONObject json = JSONUtil.parseObj(body); String code = json.get("code").toString(); String data = json.get("data").toString(); if ("0".equals(code) && data.length() > 0) { JSONArray array = JSONUtil.parseArray(data); reqFilesList = array.toList(FileCreateLog.class); String dateNow = DateUtil.format(new Date(), "yyyyMMdd"); if (CollectionUtil.isNotEmpty(reqFilesList)) { dqListNow = reqFilesList.stream().filter(f -> f.getFileName().contains("DQ_" + dateNow)).collect(Collectors.toList()); nwpListNow = reqFilesList.stream().filter(f -> f.getFileName().contains("NWP_" + dateNow)).collect(Collectors.toList()); if (CollectionUtil.isNotEmpty(dqListNow)) { // 降序 dqListNow.sort(Comparator.comparing(FileCreateLog::getFileName).reversed()); if (null != dqListNow && dqListNow.size() > 0) { // 取最新的一条数据 FileCreateLog fileCreateLog = dqListNow.get(0); finalList.add(fileCreateLog); } } else { log.error("下载minio原始RB文件 --> {} 失败,失败原因:{}", stationCode, "文件为空!"); } if (CollectionUtil.isNotEmpty(nwpListNow)) { // 降序 nwpListNow.sort(Comparator.comparing(FileCreateLog::getFileName).reversed()); if (null != dqListNow && dqListNow.size() > 0) { // 取最新的一条数据 FileCreateLog fileCreateLog = nwpListNow.get(0); finalList.add(fileCreateLog); } } else { log.error("下载minio原始RB文件 --> {} 失败,失败原因:{}", stationCode, "文件为空!"); } } else { log.error("下载minio原始RB文件 --> {} 失败,失败原因:{}", stationCode, "文件为空!"); } // if (null != reqFilesList && reqFilesList.size() > 0) { // //对文件进行筛选 // reqFilesList = screeningDownloadFils(reqFilesList); // } } return finalList; } /** * 下载文件,先从minio上下载,失败再从cloud下载 * 下载限时20秒 * * @param tempFilePath * @param curentFileName * @param id * @param fileDownloadUrl * @return */ private Long downloadFileByMinioOrCloud(String tempFilePath, String curentFileName, String id, String fileDownloadUrl) { Long size = 0L; // if (StrUtil.isNotBlank(fileDownloadUrl)) { // size = HttpUtil.downloadFile(fileDownloadUrl, new File(tempFilePath + File.separatorChar // + curentFileName), 20000); // if (size > 0) { // log.info("直接从minio下载文件成功:{}", curentFileName); // callBackFileDownloadStatusToCloud(id,"S"); // return size; // } // } String url = Constant.cacheClientConfig.getCloudAddr() + "downloadFileById?id=" + id + "&sign=" + Md5Util.makeMd5(id); size = HttpUtil.downloadFile(url, new File(tempFilePath + File.separatorChar + curentFileName), 20000); if (size > 0) { callBackFileDownloadStatusToCloud(id, "S"); log.info("从minio下载文件失败,改从云端下载文件成功:{}", curentFileName); return size; } //最后都没有成功下载文件回传异常 callBackFileDownloadStatusToCloud(id, "E"); return size; } /** * 回传云端文件下载状态请求 * * @param id * @param status */ private void callBackFileDownloadStatusToCloud(String id, String status) { Map postParms = new HashMap<>(3); postParms.put("id", id); postParms.put("type", status); postParms.put("sign", Md5Util.makeMd5(id)); String body = HttpUtil.post(Constant.cacheClientConfig.getCloudAddr() + "postBackDownFileStatus", postParms, 10000); log.info("文件下载成功,回传云端文件下载状态响应报文:{}", body); } /** * 回传云端文件下载文件传输到内网状态 * * @param id * @param status */ private void callBackTransInStatusToCloud(String id, String status) { Map postParms = new HashMap<>(3); postParms.put("id", id); postParms.put("type", status); postParms.put("sign", Md5Util.makeMd5(id)); String body = HttpUtil.post(Constant.cacheClientConfig.getCloudAddr() + "postBackIntranetStatus", postParms, 10000); log.info("文件传输内网状态响应报文:{}", body); } /** * 移动外网气象配置文件到当前程序下,并回传接收文件正常状态给云端 */ private void toDealProtocol(String filePath, String id) { try { //如果是protocol2file的配置文件 不需要放到反向隔离目录下 而是放到程序同级目录 //TODO 改变文件存放位置为上级目录 ../ UtilTools.mvFile(filePath, "../"); callBackTransInStatusToCloud(id, "S"); log.info("回传protocol2file配置文件传入内网信息,重启client"); Runtime.getRuntime().exec("service client restart"); } catch (Exception e) { log.error("获取到protocol2file的配置文件,在移动时发生异常", e); } } /** * 筛选待下载文件,不需要下载的文件剔除 * * @param list */ private List screeningDownloadFils(List list) { List resultList = new ArrayList<>(); if (null == Constant.cacheClientConfig.getDownLoadFileChoose() || "0".equals(Constant.cacheClientConfig.getDownLoadFileChoose())) { return list; } else if ("1".equals(Constant.cacheClientConfig.getDownLoadFileChoose())) {//只下载短期文件 log.debug("云端配置只下载短期文件"); list.stream().forEach(fileCreateLog -> { if (fileCreateLog.getFileName().contains("DQ")) { resultList.add(fileCreateLog); } }); } else if ("2".equals(Constant.cacheClientConfig.getDownLoadFileChoose())) { //只下载nwp文件 log.debug("云端配置只下载NWP文件"); list.stream().forEach(fileCreateLog -> { if (fileCreateLog.getFileName().contains("NWP")) { resultList.add(fileCreateLog); } }); } else { resultList.addAll(list); } return resultList; } /** * 检查临时路径,不存在则创建 * * @return */ private String checkTempFilePath(String fxglFilePath) { //反向隔离文件发送地址 fxglFilePath = UtilTools.judeDirExists(fxglFilePath); String dir = ""; if (fxglFilePath.endsWith(String.valueOf(File.separatorChar))) { dir = fxglFilePath.substring(0, fxglFilePath.length() - 1) + "temp"; } else { dir = fxglFilePath + "temp"; } return dir; } /** * http扫描文件是否传入内网 * 文件传送到内网成功后向云平台发送文件成功请求 * //-----------------------// * 2022-08-18 加入判断反向隔离传输bak文件夹是否为空, * 当new 和bak文件夹都不存在已下载的文件时,判定文件已传输到内网并向云端回传 */ @Async @Scheduled(initialDelay = 30 * 1000, fixedRateString = "120000") void scanTasksFxgl() { if ("否".equals(Constant.cacheClientConfig.getFxglTask())) { log.info("反向隔离扫描任务已关闭"); return; } log.info("开始执行定时任务:扫描反向隔离目录,查看文件是否传入内网"); String filePath = Constant.cacheClientConfig.getFxglPath(); if (null == filePath || filePath.length() == 0) { log.warn("反向隔离文件路径未配置,不执行扫描任务。"); return; } //遍历已下载的文件list 筛选已成功传入内网服务器的文件,发送传入成功标识 if (Constant.downLoadFilesStatusList == null || Constant.downLoadFilesStatusList.size() == 0) { log.warn("当前无下载文件,不需要扫描反向隔离路径"); return; } for (String stationCode : applicationStationCode.split(",")) { Date date = new Date(); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd"); String today = simpleDateFormat.format(date); String newFilePath = filePath + "/" + stationCode + "/" + "new/"; String analysisFilePath = filePath + "/" + stationCode + "/" + "success/" + today; //反向隔离new文件夹下文件列表 List newFilelist = getFxglFilesListByDirector(newFilePath); //反向隔离bak文件夹下文件列表 String bakFilePath = getFxglModifyPath(newFilePath, "bak"); List bakFilelist = getFxglFilesListByDirector(bakFilePath); //反向隔离传输错误文件夹 String errorFilePath = getFxglModifyPath(newFilePath, "error"); List errorFilelist = getFxglFilesListByDirector(errorFilePath); //获取解析成功文件夹下文件列表 List analysisFileList = getFxglFilesListByDirector(analysisFilePath); List fileCreateLogs = new ArrayList<>(); for (FileCreateLog fileCreateLog : Constant.downLoadFilesStatusList) { //判断不在新下载的文件夹中后: if (!newFilelist.contains(fileCreateLog.getFileName())) { //回传状态,默认失败 String goBackFlag = "E"; //文件传入bak目录说明文件下载成功,解析成功 if (analysisFileList.contains(fileCreateLog.getFileName())) { goBackFlag = "S"; log.info("文件:{} 默认传入内网成功,回传云平台此文件传输状态:{}", fileCreateLog.getFileName(), goBackFlag); } else if (errorFilelist.contains(fileCreateLog.getFileName())) { //在error文件夹下,向内网传输失败了 // log.info("文件:{} 传入内网失败,回传云平台此文件传输状态:{}", fileCreateLog.getFileName(), goBackFlag); } else { //找不到文件,日志提示,不进行回传状态 // log.warn("当前文件:{} 可能在移动中,不在new下 且未在bak或error文件夹下找到", fileCreateLog.getFileName()); continue; } callBackTransInStatusToCloud(fileCreateLog.getId(), goBackFlag); // fileCreateLogs.add(fileCreateLog); } else { // log.warn("文件:{} 还在new文件夹下面,未被传输移动,2分钟后再次检查", fileCreateLog.getFileName()); } } //清空下载文件列表中已经回传过文件传输状态的文件 if (null != fileCreateLogs && fileCreateLogs.size() > 0) { for (FileCreateLog fileCreateLog : fileCreateLogs) { Constant.downLoadFilesStatusList.remove(fileCreateLog); } } } } /** * 获取反向隔离相关路径下的文件列表 * * @param filePath * @return */ private List getFxglFilesListByDirector(String filePath) { //文件集合 List list = new ArrayList<>(); //路径不存在则进行创建 UtilTools.judeDirExists(filePath); File dirFile = new File(filePath); File[] files = dirFile.listFiles(); if (null == files) { try { throw new Exception("文件路径:" + filePath + "不存在"); } catch (Exception e) { throw new RuntimeException(e); } } for (File file : files) { list.add(file.getName()); } return list; } /** * 正向隔离扫描 */ @Async // @Scheduled(initialDelay = 40 * 1000, fixedRateString = "120000") void scanTasksZxgl() { if ("否".equals(Constant.cacheClientConfig.getZxglTask())) { log.info("正向隔离扫描任务已关闭"); return; } log.info("开始执行定时任务:扫描正向隔离目录"); String filePath = Constant.cacheClientConfig.getZxglPath(); if (null == filePath || filePath.length() == 0) { log.warn("正向隔离路径未配置,不执行定时任务"); return; } try { filePath = URLDecoder.decode(filePath, "UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } File dirFile = new File(filePath); //判断该目录是否存在,不存在时创建 if (!dirFile.exists()) { dirFile.mkdirs(); log.info("该【" + dirFile.getPath() + "】目录不存在,系统自动创建文件目录"); } File[] files = dirFile.listFiles(); String dayStr = new SimpleDateFormat("yyyyMMdd").format(new Date());//当前时间格式化为年月日 if (files != null && files.length > 0) { for (File file : files) { if (!file.isDirectory()) { //如果不是今天的文件,则删除该文件 if (!file.getName().substring(file.getName().indexOf("_") + 1, file.getName().indexOf("_") + 9).equals(dayStr)) { if (file.exists()) { file.delete(); log.info(file.getName() + "文件不是当天文件不符合要求 、移除该文件"); } continue; } else { //解析文件 try { if (file.getName().startsWith("STATUS_") || file.getName().startsWith("RP_")) { String val = readTxtFile(filePath + file.getName()); HttpUtil.get(val); moveFile(new File(filePath + file.getName()), filePath + File.separator + "bak" + File.separator); log.debug(file.getName() + "文件解析成功,移动到:" + filePath + File.separator + "bak" + File.separator); } else { file.delete(); log.error(file.getName() + "文件名不符合要求,删除。"); } } catch (Exception e) { log.error(file.getName() + "解析失败", e); } } } else { File[] filebaks = file.listFiles(); for (File filebak : filebaks) { if (!filebak.getName().substring(filebak.getName().indexOf("_") + 1, filebak.getName().indexOf("_") + 9).equals(dayStr)) { if (filebak.exists()) { filebak.delete(); log.info(filebak.getName() + "删除今天之前的历史文件"); } } } } } } else { log.info("该【" + dirFile.getPath() + "】目录下没有文件"); } } /** * 移动文件 * * @param file 文件 */ private void moveFile(File file, String path) { File destFile = new File(path + file.getName()); if (destFile.exists()) { destFile.delete(); } try { FileUtils.moveFile(file, destFile); } catch (IOException e) { log.error(file.getName() + "文件移动失败", e); } } @Scheduled(fixedRate = 60000l) public void realPowerForQr() { File zxglPathFile = new File(zxglPath + File.separator + "new"); log.info("开始扫描正向隔离回传目录{}", zxglPathFile.getPath()); String bakPath = zxglPath + File.separator + "bak"; if (zxglPathFile.exists()) { for (String s : zxglPathFile.list()) { List list; try { File file = new File(zxglPathFile.getPath() + File.separator + s); list = FileUtil.getFileContent(file); TaskResultRequestVO taskResultRequestVO = new TaskResultRequestVO(); taskResultRequestVO.setTaskResult(list.get(0)); String url = "https://api.jiayuepowertech.com:8080/task/result"; // 构造请求对象 HttpRequest request = HttpRequest.put(url).body(JsonBeanUtil.beanToJson(taskResultRequestVO)); // 发送请求并获取响应对象 HttpResponse response = request.execute(); FileUtils.moveFile(file, new File(bakPath + File.separator + file.getName())); log.info("回传实际功率状态=》{}", response.body()); } catch (Exception e) { e.printStackTrace(); } } } } @Data class TaskResultRequestVO { private static final long serialVersionUID = 1L; private String taskResult; } }