|
@@ -0,0 +1,251 @@
|
|
|
|
+package com.cpp.web.service.datafactory;
|
|
|
|
+
|
|
|
|
+import cn.hutool.core.util.CharsetUtil;
|
|
|
|
+import cn.hutool.extra.ftp.Ftp;
|
|
|
|
+import cn.hutool.extra.ftp.FtpConfig;
|
|
|
|
+import cn.hutool.extra.ftp.FtpMode;
|
|
|
|
+import cn.hutool.extra.ssh.JschUtil;
|
|
|
|
+import cn.hutool.extra.ssh.Sftp;
|
|
|
|
+import com.cpp.web.domain.AbnormalAlarm;
|
|
|
|
+import com.cpp.web.domain.datafactory.SftpChannel;
|
|
|
|
+import com.cpp.web.domain.datafactory.ParsingLog;
|
|
|
|
+import com.cpp.web.domain.datafactory.ParsingType;
|
|
|
|
+import com.cpp.web.domain.datafactory.dto.ParsingResultDto;
|
|
|
|
+import com.cpp.web.domain.enums.AlarmEnum;
|
|
|
|
+import com.cpp.web.domain.enums.DataSourcesEnum;
|
|
|
|
+import com.cpp.web.domain.station.ElectricField;
|
|
|
|
+import com.cpp.web.service.AbnormalAlarmService;
|
|
|
|
+import com.cpp.web.service.station.ElectricFieldService;
|
|
|
|
+import com.cpp.web.utils.MessageUtils;
|
|
|
|
+import com.jcraft.jsch.ChannelSftp;
|
|
|
|
+import com.jcraft.jsch.JSch;
|
|
|
|
+import com.jcraft.jsch.Session;
|
|
|
|
+import lombok.AllArgsConstructor;
|
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
|
+import org.apache.commons.io.FileUtils;
|
|
|
|
+import org.apache.commons.lang3.time.DateFormatUtils;
|
|
|
|
+import org.springframework.scheduling.annotation.Scheduled;
|
|
|
|
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
|
+
|
|
|
|
+import java.io.File;
|
|
|
|
+import java.io.IOException;
|
|
|
|
+import java.lang.reflect.Field;
|
|
|
|
+import java.nio.charset.Charset;
|
|
|
|
+import java.util.*;
|
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
+
|
|
|
|
+/**
|
|
|
|
+ * ftp文件解析
|
|
|
|
+ *
|
|
|
|
+ * @author tl
|
|
|
|
+ * @date 2022-05-11 09:51:21
|
|
|
|
+ */
|
|
|
|
+@AllArgsConstructor
|
|
|
|
+@Slf4j
|
|
|
|
+@Service
|
|
|
|
+public class SftpFileParsing {
|
|
|
|
+
|
|
|
|
+ private final ParsingTypeService parsingTypeService;
|
|
|
|
+
|
|
|
|
+ private final Map<String, ParsingInterface> parsingInterfaceMap;
|
|
|
|
+
|
|
|
|
+ private final SftpChannelService sftpChannelService;
|
|
|
|
+
|
|
|
|
+ private final ElectricFieldService electricFieldService;
|
|
|
|
+
|
|
|
|
+ private final ParsingLogService parsingLogService;
|
|
|
|
+
|
|
|
|
+ private final AbnormalAlarmService abnormalAlarmService;
|
|
|
|
+
|
|
|
|
+ private final String PARSING_FILE_TEMP_DIR = "/";
|
|
|
|
+ private final String PARSING_FILE_SUCCESS_DIR = "/";
|
|
|
|
+ private final String PARSING_FILE_FAIL_DIR = "/";
|
|
|
|
+
|
|
|
|
+ private final File fileTempDir = new File(PARSING_FILE_TEMP_DIR);
|
|
|
|
+
|
|
|
|
+ private final ThreadPoolTaskExecutor executor;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 解析文件得定时任务
|
|
|
|
+ * 逻辑:先配置识别文件名称类型:识别文件名称关键字对应的文件类型
|
|
|
|
+ * 有文件类型的配置才可以进入下一步
|
|
|
|
+ * <p>
|
|
|
|
+ * 查找各场站配置的ftp目录,去下载文件并识别文件类型执行对应的解析策略
|
|
|
|
+ * <p>
|
|
|
|
+ * 必要条件:1.配置文件识别类型。2.配置下载路径。3.配置解析公式
|
|
|
|
+ *
|
|
|
|
+ * @param
|
|
|
|
+ * @return
|
|
|
|
+ */
|
|
|
|
+ @Scheduled(fixedRate = 300000L)
|
|
|
|
+ public void parsingFileJob() {
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ log.info("-----------------开始执行FTP文件解析任务----------------------");
|
|
|
|
+
|
|
|
|
+ List<ParsingType> parsingTypes = parsingTypeService.list();
|
|
|
|
+
|
|
|
|
+ if (parsingTypes.size() > 0) {
|
|
|
|
+
|
|
|
|
+ List<SftpChannel> channels = sftpChannelService.list();
|
|
|
|
+
|
|
|
|
+ List<ElectricField> electricFields = electricFieldService.list();
|
|
|
|
+ for (SftpChannel channel : channels) {
|
|
|
|
+
|
|
|
|
+ List<ElectricField> channelElectricFields = electricFields.stream().filter(e -> e.getFtpChanelId() != null && e.getFtpChanelId().equals(channel.getId())).collect(Collectors.toList());
|
|
|
|
+
|
|
|
|
+ if (channelElectricFields.size() > 0) {
|
|
|
|
+ executeSftpParsing(channel, parsingTypes, electricFields);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ } else {
|
|
|
|
+ log.info("未配置文件识别类型标识,无法进行下载解析");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ log.info("-----------------执行FTP文件解析任务完成----------------------");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * sftp下载文件并解析文件的具体步骤
|
|
|
|
+ * @param parsingTypes
|
|
|
|
+ * @param channelElectricFields
|
|
|
|
+ * @param sftp
|
|
|
|
+ */
|
|
|
|
+ private void parsingFile(List<ParsingType> parsingTypes, List<ElectricField> channelElectricFields, Sftp sftp) {
|
|
|
|
+ List<ParsingLog> parsingLogs = new ArrayList<>();
|
|
|
|
+ List<AbnormalAlarm> abnormalAlarms = new ArrayList<>();
|
|
|
|
+ for (ParsingType parsingType : parsingTypes) {
|
|
|
|
+ executor.execute(new Runnable() {
|
|
|
|
+ @Override
|
|
|
|
+ public void run() {
|
|
|
|
+ ParsingInterface parsingInterface = parsingInterfaceMap.get(parsingType.getFileType().name() + "Parsing");
|
|
|
|
+ parsingInterface.activationParsingConf();//初始化解析配置
|
|
|
|
+ for (ElectricField electricField : channelElectricFields) {
|
|
|
|
+
|
|
|
|
+ String ftpUrl = electricField.getFtpUrl();
|
|
|
|
+ List<String> fileNames = sftp.ls(ftpUrl).stream().filter(f -> f.contains(parsingType.getFileName())).collect(Collectors.toList());
|
|
|
|
+ if (fileNames.size() > 0) {
|
|
|
|
+ for (String fileName : fileNames) {
|
|
|
|
+ try {
|
|
|
|
+ //下载文件到临时目录
|
|
|
|
+ sftp.download(ftpUrl, fileTempDir,fileName);
|
|
|
|
+ File file = FileUtils.getFile(fileTempDir, fileName);
|
|
|
|
+ ParsingLog parsingLog = new ParsingLog();
|
|
|
|
+ Date now = new Date();
|
|
|
|
+ parsingLog.setParsingTime(now);
|
|
|
|
+ ParsingResultDto parsingResultDto = parsingInterface.parsing(file, electricField.getStationCode());
|
|
|
|
+ parsingLog.setParsingDescribe(parsingResultDto.getMessage());
|
|
|
|
+ parsingLog.setFileType(parsingType.getFileType());
|
|
|
|
+ parsingLog.setDataSources(DataSourcesEnum.E1);
|
|
|
|
+ if (parsingResultDto.getStatus().equals("fail")) {
|
|
|
|
+ try {
|
|
|
|
+ File failFileDir = new File(PARSING_FILE_FAIL_DIR + File.separator + DateFormatUtils.format(now, "yyyy-MM-DD"));
|
|
|
|
+ File failFile = new File(failFileDir.getPath() + File.separator + fileName);
|
|
|
|
+ if (failFile.exists()) {
|
|
|
|
+ failFile.delete();
|
|
|
|
+ log.error("已有过解析失败文件,错误文件将覆盖!场站编号:{},文件名称:{}", electricField.getStationCode(), fileName);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ FileUtils.moveFile(file, failFileDir);
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ log.error("解析文件失败后文件移动失败!场站编号:{},文件名称:{}", electricField.getStationCode(), fileName, e);
|
|
|
|
+ e.printStackTrace();
|
|
|
|
+ }
|
|
|
|
+ file.delete();//失败删除本地文件,等待下次下载
|
|
|
|
+ parsingLog.setParsingFileStatus("失败");
|
|
|
|
+
|
|
|
|
+ abnormalAlarms.add(new AbnormalAlarm(DataSourcesEnum.E3, AlarmEnum.E4, MessageUtils.format("无法解析场站端文件:{}", fileName), electricField.getStationCode()));
|
|
|
|
+
|
|
|
|
+ } else {
|
|
|
|
+ sftp.delFile(ftpUrl + "/" + fileName);//成功删除ftp上的文件
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ File successFileDir = new File(PARSING_FILE_SUCCESS_DIR + File.separator + DateFormatUtils.format(now, "yyyy-MM-DD"));
|
|
|
|
+ File successFile = new File(successFileDir.getPath() + File.separator + fileName);
|
|
|
|
+ if (successFile.exists()) {
|
|
|
|
+ successFile.delete();
|
|
|
|
+ log.error("已有过解析成功文件,成功文件将覆盖!场站编号:{},文件名称:{}", electricField.getStationCode(), fileName);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ FileUtils.moveFile(file, successFileDir);
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ log.error("解析文件成功后文件移动失败!场站编号:{},文件名称:{}", electricField.getStationCode(), fileName, e);
|
|
|
|
+ e.printStackTrace();
|
|
|
|
+ }
|
|
|
|
+ file.delete();//失败删除本地文件,等待下次下载
|
|
|
|
+ parsingLog.setParsingFileStatus("成功");
|
|
|
|
+ }
|
|
|
|
+ parsingLogs.add(parsingLog);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ abnormalAlarms.add(new AbnormalAlarm(DataSourcesEnum.E3, AlarmEnum.E4, MessageUtils.format("无法解析场站端文件:{}", fileName), electricField.getStationCode()));
|
|
|
|
+ log.error("下载并解析文件{}时异常", fileName, e);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ parsingLogService.saveBatch(parsingLogs);
|
|
|
|
+
|
|
|
|
+ abnormalAlarmService.saveBatch(abnormalAlarms);
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 创建sftp 并解析
|
|
|
|
+ *
|
|
|
|
+ * @param sftpChannel
|
|
|
|
+ * @param parsingTypes
|
|
|
|
+ * @param channelElectricFields
|
|
|
|
+ */
|
|
|
|
+ public void executeSftpParsing(SftpChannel sftpChannel, List<ParsingType> parsingTypes, List<ElectricField> channelElectricFields) {
|
|
|
|
+ // SFTP方式上报
|
|
|
|
+ Sftp sftp = null;
|
|
|
|
+ JSch jsch = new JSch();
|
|
|
|
+ ChannelSftp channel = null;
|
|
|
|
+ Session sshSession = null;
|
|
|
|
+ try {
|
|
|
|
+ try {
|
|
|
|
+ sshSession = jsch.getSession(sftpChannel.getUsername(), sftpChannel.getIp(), sftpChannel.getPort());
|
|
|
|
+ sshSession.setPassword(sftpChannel.getPassword());
|
|
|
|
+ sshSession.setTimeout(30000);
|
|
|
|
+ Properties sshConfig = new Properties();
|
|
|
|
+ sshConfig.put("StrictHostKeyChecking", "no");
|
|
|
|
+ sshSession.setConfig(sshConfig);
|
|
|
|
+ log.info("sftp开始连接...");
|
|
|
|
+ sshSession.connect();
|
|
|
|
+ channel = (ChannelSftp) sshSession.openChannel("sftp");
|
|
|
|
+ channel.connect();
|
|
|
|
+ Class cl = ChannelSftp.class;
|
|
|
|
+ Field f = cl.getDeclaredField("server_version");
|
|
|
|
+ f.setAccessible(true);
|
|
|
|
+ f.set(channel, 2);
|
|
|
|
+ channel.setFilenameEncoding("UTF-8");
|
|
|
|
+ sftp = new Sftp(channel, CharsetUtil.CHARSET_UTF_8);
|
|
|
|
+
|
|
|
|
+ parsingFile(parsingTypes, channelElectricFields, sftp);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ log.error("sftp连接异常:" + e);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ } finally {
|
|
|
|
+ if (sftp != null) {
|
|
|
|
+ log.info("sftp客户端关闭");
|
|
|
|
+ JschUtil.closeAll();
|
|
|
|
+ sftp.close();
|
|
|
|
+ channel.quit();
|
|
|
|
+ channel.disconnect();
|
|
|
|
+ sshSession.disconnect();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+}
|