Browse Source

新增sftp下载解析方式

tl 21 giờ trước cách đây
mục cha
commit
98f7c3d984

+ 7 - 1
ipp-idp/pom.xml

@@ -101,7 +101,13 @@
             <artifactId>spring-boot-starter-thymeleaf</artifactId>
             <version>2.1.6.RELEASE</version>
         </dependency>
-
+        <dependency>
+            <groupId>com.jcraft</groupId>
+            <artifactId>jsch</artifactId>
+            <version>0.1.55</version>
+            <scope>compile</scope>
+            <optional>true</optional>
+        </dependency>
     </dependencies>
     <profiles>    <!--考虑到window 和linux环境 npm命令格式的问题,使用maven的profile实现动态指定命令-->
         <profile>

+ 5 - 0
ipp-idp/src/main/java/com/jiayue/ipp/idp/job/ParsingJob.java

@@ -6,6 +6,7 @@ import com.jiayue.ipp.idp.service.an.DownloadService;
 import com.jiayue.ipp.idp.service.an.FTPService;
 import com.jiayue.ipp.idp.service.an.ParsingChannelService;
 //import com.jiayue.ipp.idp.util.RedisUtil;
+import com.jiayue.ipp.idp.service.an.SFTPService;
 import lombok.AllArgsConstructor;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Service;
@@ -19,6 +20,7 @@ public class ParsingJob {
     private final ParsingChannelService parsingChannelService;
     private final DownloadService downloadService;
     private final FTPService ftpService;
+    private final SFTPService sftpService;
 //    private final RedisUtil redisUtil;
 
 //    @XxlJob("parsingJob")
@@ -54,6 +56,9 @@ public class ParsingJob {
                         // 解析自己家的云端预测文件
                         downloadService.parsing(parsingChannel);
                         break;
+                    case "E4":
+                        sftpService.sftp(parsingChannel);
+                        break;
                     default:
                         break;
                 }

+ 136 - 0
ipp-idp/src/main/java/com/jiayue/ipp/idp/service/an/SFTPService.java

@@ -0,0 +1,136 @@
+package com.jiayue.ipp.idp.service.an;
+
+
+import cn.hutool.extra.ssh.Sftp;
+import com.jiayue.ipp.common.data.entity.an.ParsingChannel;
+import com.jiayue.ipp.common.data.entity.an.ParsingLog;
+import com.jiayue.ipp.common.data.entity.an.ParsingUrl;
+import com.jiayue.ipp.idp.dto.FileAnalysisStatusDto;
+import com.jiayue.ipp.idp.util.FileUtil;
+import com.jiayue.ipp.idp.util.sftp.SftpTool;
+import com.jiayue.ipp.idp.util.sftp.SftpUtil;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.time.DateFormatUtils;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.util.ResourceUtils;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.util.Date;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * ftp通信业务层
+ *
+ * @author tl
+ * @date 2022-05-11 09:51:21
+ */
+@RequiredArgsConstructor
+@Slf4j
+@Service
+@Transactional
+public class SFTPService {
+
+    private final ParsingUrlService parsingUrlService;
+
+    private final ParsingFileService parsingFileService;
+
+    private final ParsingLogService parsingLogService;
+
+
+    /**
+     * 逻辑过程:
+     * 获取ftp解析的通道进行ftp连接
+     * 通过ftp的下载路径下载对应场站和预测厂家目录中的文件到本地目录
+     * 进行文件解析
+     * 解析成功后,文件上传到minio
+     * 将记录存入数据库
+     * 备份并删除本地文件(备份文件应定时清理)
+     * 解析成功并结束
+     */
+    public void sftp(ParsingChannel sftpParsingChannel) {
+
+        List<ParsingUrl> ftpParsingUrls = parsingUrlService.list();
+        //ftp连接
+        SftpTool sftpTool = SftpUtil.createSftp(sftpParsingChannel);
+
+        //判断是否连接成功,成功后业务继续
+        if (sftpTool.getSftp() != null) {
+            //过滤出当前解析通道下在使用的《 解析路径 》
+            List<ParsingUrl> ftpParsingUrlList = ftpParsingUrls.stream().filter(s -> s.getCId().equals(sftpParsingChannel.getId()) && s.getUrlStatus().equals("1")).collect(Collectors.toList());
+
+            try {
+                Sftp sftp = sftpTool.getSftp();
+                String dateDir = DateFormatUtils.format(new Date(), "yyyyMMdd");
+                //遍历解析路径,对文件进行解析
+                for (ParsingUrl ftpParsingUrl : ftpParsingUrlList) {
+                    String url = ftpParsingUrl.getUrl();
+                    List<String> strings = sftp.ls(url);
+                    //遍历文件
+                    int i = 0;
+                    for (String fileName : strings) {
+                        if (i > 20) {
+                            break;
+                        }
+                        i++;
+                        try {
+                            ParsingLog parsingLog = new ParsingLog();
+                            parsingLog.setFileName(fileName);
+                            parsingLog.setStationCode(ftpParsingUrl.getStationCode());
+                            parsingLog.setParsingTime(new Date());
+                            // 从ftp下载到本地parsing目录里
+                            String path = FileUtil.getParsingPath() + File.separator + ftpParsingUrl.getStationCode() + File.separator + ftpParsingUrl.getForecastManufactor() + File.separator + dateDir;
+                            File dirFile = checkGetPath(path);
+                            //下载文件到临时目录
+                            sftp.get(url + "/" + fileName, dirFile.getPath() + File.separator + fileName);
+                            File file = FileUtils.getFile(dirFile, fileName);
+                            if (file.length() <= 0) {
+                                continue;
+                            }
+
+                            //定义解析的类型,默认为错误(未知),成功后为文件类型,也会作为存储目录名
+                            FileAnalysisStatusDto fileAnalysisStatusDto = parsingFileService.parsingFile(file, ftpParsingUrl);
+                            parsingLog.setFileType(fileAnalysisStatusDto.getFileType());
+                            if (!"1".equals(fileAnalysisStatusDto.getStatus())) {
+                                // 解析失败
+                                file.delete();
+                                parsingLog.setParsingFileStatus("0");
+                                parsingLog.setParsingDescribe(fileAnalysisStatusDto.getMessage());
+                            } else {
+                                // 解析成功,删除ftp上的文件
+                                sftp.delFile(url + "/" + fileName);
+                                parsingLog.setParsingFileStatus("1");
+                                parsingLog.setParsingDescribe("文件解析成功");
+                            }
+                            parsingLogService.save(parsingLog);
+                        } catch (Exception e) {
+                            log.error(ftpParsingUrl.getStationCode() + "文件解析失败" + fileName, e);
+                        }
+                    }
+                }
+
+                log.info("{} sftp下载文件并解析执行完成", sftpParsingChannel.getChannelName());
+            } catch (Exception e) {
+                log.error("{} sftp下载文件并解析过程失败", sftpParsingChannel.getChannelName(), e);
+            }
+        }
+        sftpTool.close();
+
+    }
+
+    public static File checkGetPath(String path){
+        File file = new File(path);
+        if (!file.exists()){
+            file.mkdirs();
+        }
+
+        return file;
+    }
+
+}

+ 20 - 5
ipp-idp/src/main/java/com/jiayue/ipp/idp/util/FileUtil.java

@@ -71,18 +71,33 @@ public class FileUtil {
 
 
     public static String getParsingPath() {
-        return createParsingDir("parsing");
+        return createAllDir("parsing");
     }
 
 //    public static String getSendJyDataPath() {
-//        return createParsingDir("ftp");
+//        return createAllDir("ftp");
 //    }
 
-    private static String createParsingDir(String dir) {
-        String path = "/home/syjy/ipp/" + dir;
+    private static String createAllDir(String dir) {
+        String path = "";
+        if (System.getProperties().getProperty("file.separator").equals("\\")) {
+            path = new File(getResourceBasePath()).getParentFile().getParentFile().getParentFile().getAbsolutePath() + File.separator + dir;
+            try {
+                path = URLDecoder.decode(path, "UTF-8");
+            } catch (UnsupportedEncodingException e) {
+                e.printStackTrace();
+            }
+            File file = new File(path);
+            if (!file.exists()) {
+                boolean b = file.mkdirs();
+                if (!b)
+                    log.error("目录创建失败" + path);
+            }
+        } else {
+            path = "/home/syjy/cpp/" + dir;
+        }
         return path;
     }
-
     /**
      * 获取项目根路径
      *

+ 32 - 0
ipp-idp/src/main/java/com/jiayue/ipp/idp/util/sftp/SftpTool.java

@@ -0,0 +1,32 @@
+package com.jiayue.ipp.idp.util.sftp;
+
+import cn.hutool.extra.ssh.JschUtil;
+import cn.hutool.extra.ssh.Sftp;
+import com.jcraft.jsch.ChannelSftp;
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.Session;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 一个临时的sftp控制类
+ */
+@Data
+@Slf4j
+public class SftpTool {
+    private JSch jsch = new JSch();
+    private ChannelSftp channel = null;
+    private Session sshSession = null;
+    private Sftp sftp = null;
+
+    public void close() {
+        if (this.sftp != null) {
+            log.info("sftp客户端关闭");
+            JschUtil.closeAll();
+            sftp.close();
+            channel.quit();
+            channel.disconnect();
+            sshSession.disconnect();
+        }
+    }
+}

+ 91 - 0
ipp-idp/src/main/java/com/jiayue/ipp/idp/util/sftp/SftpUtil.java

@@ -0,0 +1,91 @@
+package com.jiayue.ipp.idp.util.sftp;
+
+import cn.hutool.core.util.CharsetUtil;
+import cn.hutool.extra.ssh.Sftp;
+import com.jcraft.jsch.ChannelSftp;
+import com.jiayue.ipp.common.data.entity.an.ParsingChannel;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.lang.reflect.Field;
+import java.nio.file.Files;
+import java.util.Properties;
+
+@Slf4j
+public class SftpUtil {
+
+    public static final String CHARSET_NAME = "UTF-8";
+
+
+    /**
+     *
+     * sftp 上传文件
+     *
+     * @param sftp
+     * @param destPathName  目标路径
+     * @param srcName   源路径
+     * @param fileName  上传文件名称
+     * @param isDel 是否删除源文件
+     */
+    public static void upload(
+            Sftp sftp,
+            String destPathName,
+            String srcName,
+            String fileName,
+            Boolean isDel
+    ) {
+        try (FileInputStream fis = new FileInputStream(srcName)) {
+            File file = new File(srcName);
+            sftp.getClient().put(fis, destPathName + "/" + fileName);
+            fis.close();
+            if (isDel){
+                Files.deleteIfExists(file.toPath());
+            }
+        } catch (Exception e) {
+            log.error("上传备份文件异常", e);
+        }
+    }
+
+
+
+    /**
+     * 创建sftp 并解析
+     *
+     * @param sftpChannel sftp通道信息
+     * @return sftp控制
+     */
+    public static SftpTool createSftp(
+            ParsingChannel sftpChannel
+    ) {
+        // SFTP方式上报
+        SftpTool sftpTool = new SftpTool();
+
+        try {
+            sftpTool.setSshSession(sftpTool.getJsch().getSession(sftpChannel.getUsername(), sftpChannel.getRemoteIp(), Integer.parseInt(sftpChannel.getRemotePort())));
+            sftpTool.getSshSession().setPassword(sftpChannel.getPassword());
+            sftpTool.getSshSession().setTimeout(30000);
+            Properties sshConfig = new Properties();
+            sshConfig.put("StrictHostKeyChecking", "no");
+            sftpTool.getSshSession().setConfig(sshConfig);
+            log.info("{} sftp开始连接...",sftpChannel.getChannelName());
+            sftpTool.getSshSession().connect();
+            sftpTool.setChannel((ChannelSftp) sftpTool.getSshSession().openChannel("sftp"));
+            sftpTool.getChannel().connect();
+            Class cl = ChannelSftp.class;
+            Field f = cl.getDeclaredField("server_version");
+            f.setAccessible(true);
+            f.set(sftpTool.getChannel(), 2);
+            sftpTool.getChannel().setFilenameEncoding(CHARSET_NAME);
+            sftpTool.setSftp(new Sftp(sftpTool.getChannel(), CharsetUtil.CHARSET_UTF_8));
+            log.info("{} sftp连接成功!!!",sftpChannel.getChannelName());
+        } catch (Exception e) {
+            log.error("{} sftp连接异常:" + e,sftpChannel.getChannelName());
+            if (sftpTool.getSftp() != null) {
+                sftpTool.close();
+            }
+        }
+        return sftpTool;
+    }
+
+}