|
@@ -72,18 +72,18 @@ public class FTPService {
|
|
*/
|
|
*/
|
|
public void ftp() {
|
|
public void ftp() {
|
|
|
|
|
|
- List<ParsingChannel> ParsingChannels = parsingChannelService.list();
|
|
|
|
|
|
+ List<ParsingChannel> ftpParsingChannels = parsingChannelService.list();
|
|
List<ParsingUrl> ftpParsingUrls = parsingUrlService.list();
|
|
List<ParsingUrl> ftpParsingUrls = parsingUrlService.list();
|
|
|
|
|
|
//循环ftp解析通道
|
|
//循环ftp解析通道
|
|
- for (ParsingChannel ParsingChannel : ParsingChannels) {
|
|
|
|
|
|
+ for (ParsingChannel ftpParsingChannel : ftpParsingChannels) {
|
|
|
|
|
|
//创建ftp配置
|
|
//创建ftp配置
|
|
FtpConfig ftpConfig = new FtpConfig();
|
|
FtpConfig ftpConfig = new FtpConfig();
|
|
- ftpConfig.setHost(ParsingChannel.getRemoteIp());
|
|
|
|
- ftpConfig.setPort(Integer.parseInt(ParsingChannel.getRemotePort()));
|
|
|
|
- ftpConfig.setPassword(ParsingChannel.getPassword());
|
|
|
|
- ftpConfig.setUser(ParsingChannel.getUsername());
|
|
|
|
|
|
+ ftpConfig.setHost(ftpParsingChannel.getRemoteIp());
|
|
|
|
+ ftpConfig.setPort(Integer.parseInt(ftpParsingChannel.getRemotePort()));
|
|
|
|
+ ftpConfig.setPassword(ftpParsingChannel.getPassword());
|
|
|
|
+ ftpConfig.setUser(ftpParsingChannel.getUsername());
|
|
//字符集使用UTF-8
|
|
//字符集使用UTF-8
|
|
ftpConfig.setCharset(Charset.forName("UTF-8"));
|
|
ftpConfig.setCharset(Charset.forName("UTF-8"));
|
|
ftpConfig.setConnectionTimeout(5 * 1000L);
|
|
ftpConfig.setConnectionTimeout(5 * 1000L);
|
|
@@ -94,13 +94,21 @@ public class FTPService {
|
|
//判断是否连接成功,成功后业务继续
|
|
//判断是否连接成功,成功后业务继续
|
|
if (ftp != null) {
|
|
if (ftp != null) {
|
|
//过滤出当前解析通道下在使用的《 解析路径 》
|
|
//过滤出当前解析通道下在使用的《 解析路径 》
|
|
- List<ParsingUrl> ftpParsingUrlList = ftpParsingUrls.stream().filter(s -> s.getCId().equals(ParsingChannel.getId()) && s.getUrlStatus().equals("1")).collect(Collectors.toList());
|
|
|
|
|
|
+ List<ParsingUrl> ftpParsingUrlList = ftpParsingUrls.stream().filter(s -> s.getCId().equals(ftpParsingChannel.getId()) && s.getUrlStatus().equals("1")).collect(Collectors.toList());
|
|
|
|
|
|
try {
|
|
try {
|
|
//遍历解析路径,对文件进行解析
|
|
//遍历解析路径,对文件进行解析
|
|
- for (ParsingUrl parsingUrl : ftpParsingUrlList) {
|
|
|
|
|
|
+ for (ParsingUrl ftpParsingUrl : ftpParsingUrlList) {
|
|
|
|
+
|
|
|
|
+ String url = ftpParsingUrl.getUrl();
|
|
|
|
+
|
|
|
|
+ String path = FileUtil.getParsingPath() + File.separator + ftpParsingUrl.getForecastManufactor();
|
|
|
|
+
|
|
|
|
+ File dirFile = new File(path);
|
|
|
|
+ if (!dirFile.exists()) {
|
|
|
|
+ dirFile.mkdirs();
|
|
|
|
+ }
|
|
|
|
|
|
- String url = parsingUrl.getUrl();
|
|
|
|
// String bakPath = FileUtil.getParsingPath() + File.separator + "bak" + File.separator + simpleDateFormat.format(new Date()) + File.separator + ftpParsingUrl.getForecastManufactor() + File.separator;
|
|
// String bakPath = FileUtil.getParsingPath() + File.separator + "bak" + File.separator + simpleDateFormat.format(new Date()) + File.separator + ftpParsingUrl.getForecastManufactor() + File.separator;
|
|
// File bakPathFile = new File(bakPath);
|
|
// File bakPathFile = new File(bakPath);
|
|
// if (!bakPathFile.exists()) {
|
|
// if (!bakPathFile.exists()) {
|
|
@@ -108,51 +116,42 @@ public class FTPService {
|
|
// }
|
|
// }
|
|
|
|
|
|
if (ftp.existFile(url)) {
|
|
if (ftp.existFile(url)) {
|
|
- ftp.cd(FileUtil.getParsingPath());
|
|
|
|
- FTPFile[] fileNames = ftp.lsFiles(FileUtil.getParsingPath());
|
|
|
|
-
|
|
|
|
- // 打印文件名列表
|
|
|
|
- for (FTPFile ftpFile : fileNames) {
|
|
|
|
- System.out.println(ftpFile.getName());
|
|
|
|
- }
|
|
|
|
|
|
+ List<FTPFile> ftpFiles = ftp.lsFiles(url, new Filter<FTPFile>() {
|
|
|
|
+ @Override
|
|
|
|
+ public boolean accept(FTPFile ftpFile) {
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
|
|
//遍历文件
|
|
//遍历文件
|
|
- for (FTPFile fTPFile : fileNames) {
|
|
|
|
|
|
+ for (FTPFile fTPFile : ftpFiles) {
|
|
if (!fTPFile.isDirectory()) {
|
|
if (!fTPFile.isDirectory()) {
|
|
String fileName = fTPFile.getName();
|
|
String fileName = fTPFile.getName();
|
|
ParsingLog parsingLog = new ParsingLog();
|
|
ParsingLog parsingLog = new ParsingLog();
|
|
parsingLog.setFileName(fileName);
|
|
parsingLog.setFileName(fileName);
|
|
- parsingLog.setStationCode(parsingUrl.getStationCode());
|
|
|
|
|
|
+ parsingLog.setStationCode(ftpParsingUrl.getStationCode());
|
|
parsingLog.setParsingTime(new Date());
|
|
parsingLog.setParsingTime(new Date());
|
|
|
|
|
|
- String path = FileUtil.getParsingPath() + File.separator + "bak";
|
|
|
|
-
|
|
|
|
- File dirFile = new File(path);
|
|
|
|
- if (!dirFile.exists()) {
|
|
|
|
- dirFile.mkdirs();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
//下载文件到临时目录
|
|
//下载文件到临时目录
|
|
ftp.download(url, fileName, dirFile);
|
|
ftp.download(url, fileName, dirFile);
|
|
- ftp.delFile(fileName);
|
|
|
|
File file = FileUtils.getFile(dirFile, fileName);
|
|
File file = FileUtils.getFile(dirFile, fileName);
|
|
|
|
|
|
//定义解析的类型,默认为错误(未知),成功后为文件类型,也会作为存储目录名
|
|
//定义解析的类型,默认为错误(未知),成功后为文件类型,也会作为存储目录名
|
|
String type = ParsingConstant.FAIL;
|
|
String type = ParsingConstant.FAIL;
|
|
if (fileName.endsWith("status.json")) {
|
|
if (fileName.endsWith("status.json")) {
|
|
- type = parsingStatusFile(file, parsingUrl);
|
|
|
|
|
|
+ type = parsingStatusFile(file, ftpParsingUrl);
|
|
} else {
|
|
} else {
|
|
- type = parsingFileService.parsingFile(file, parsingUrl);
|
|
|
|
|
|
+ type = parsingFileService.parsingFile(file, ftpParsingUrl);
|
|
|
|
|
|
}
|
|
}
|
|
if (!type.startsWith(ParsingConstant.FAIL)) {
|
|
if (!type.startsWith(ParsingConstant.FAIL)) {
|
|
parsingLog.setFileType(type);
|
|
parsingLog.setFileType(type);
|
|
- //minio路径
|
|
|
|
- String foldPath = "/" + simpleDateFormat.format(new Date())
|
|
|
|
- + "/" + parsingUrl.getStationCode()
|
|
|
|
- + "/" + type
|
|
|
|
- + "/" + parsingUrl.getForecastManufactor()
|
|
|
|
- + "/" + file.getName();
|
|
|
|
|
|
+// //minio路径
|
|
|
|
+// String foldPath = "/" + simpleDateFormat.format(new Date())
|
|
|
|
+// + "/" + ftpParsingUrl.getStationCode()
|
|
|
|
+// + "/" + type
|
|
|
|
+// + "/" + ftpParsingUrl.getForecastManufactor()
|
|
|
|
+// + "/" + file.getName();
|
|
|
|
|
|
//将文件上传到minIO
|
|
//将文件上传到minIO
|
|
// if (minioUp(foldPath, file)) {
|
|
// if (minioUp(foldPath, file)) {
|
|
@@ -170,44 +169,44 @@ public class FTPService {
|
|
// daMinioFileMapper.insert(daMinioFile);
|
|
// daMinioFileMapper.insert(daMinioFile);
|
|
//
|
|
//
|
|
//// ftp.changeWorkingDirectory("/lshrabbitMQ/hw_data");
|
|
//// ftp.changeWorkingDirectory("/lshrabbitMQ/hw_data");
|
|
-//
|
|
|
|
-// boolean b = ftp.delFile(url + "/" + fileName);
|
|
|
|
-// if (b) {
|
|
|
|
-// FileUtils.forceDelete(file);
|
|
|
|
-//// FileUtils.moveFile(file, new File(bakPath + File.separator + fileName));
|
|
|
|
-// parsingLog.setParsingFileStatus("1");
|
|
|
|
-// parsingLog.setParsingDescribe("文件解析成功");
|
|
|
|
-// } else {
|
|
|
|
-// file.delete();
|
|
|
|
-// log.error(ParsingChannel.getChannelName() + "ftp文件删除失败,本地文件删除,等待下次下载");
|
|
|
|
-// parsingLog.setParsingFileStatus("0");
|
|
|
|
-// parsingLog.setParsingDescribe(ParsingChannel.getChannelName() + "ftp文件删除失败,本地文件删除,等待下次下载");
|
|
|
|
-// }
|
|
|
|
-// } else {
|
|
|
|
-// file.delete();
|
|
|
|
-// log.error("minio上传失败,本地文件删除,等待下次下载");
|
|
|
|
-// parsingLog.setParsingFileStatus("0");
|
|
|
|
-// parsingLog.setParsingDescribe("minio上传失败,本地文件删除,等待下次下载");
|
|
|
|
-// }
|
|
|
|
- } else {
|
|
|
|
- parsingLog.setParsingFileStatus("0");
|
|
|
|
- //失败情况下会返回失败信息
|
|
|
|
- parsingLog.setParsingDescribe(type);
|
|
|
|
- }
|
|
|
|
|
|
+
|
|
|
|
+ boolean b = ftp.delFile(url + "/" + fileName);
|
|
|
|
+ if (b) {
|
|
|
|
+ FileUtils.forceDelete(file);
|
|
|
|
+// FileUtils.moveFile(file, new File(bakPath + File.separator + fileName));
|
|
|
|
+ parsingLog.setParsingFileStatus("1");
|
|
|
|
+ parsingLog.setParsingDescribe("文件解析成功");
|
|
|
|
+ } else {
|
|
|
|
+ file.delete();
|
|
|
|
+ log.error(ftpParsingChannel.getChannelName() + "ftp文件删除失败,本地文件删除,等待下次下载");
|
|
|
|
+ parsingLog.setParsingFileStatus("0");
|
|
|
|
+ parsingLog.setParsingDescribe(ftpParsingChannel.getChannelName() + "ftp文件删除失败,本地文件删除,等待下次下载");
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ file.delete();
|
|
|
|
+ log.error("minio上传失败,本地文件删除,等待下次下载");
|
|
|
|
+ parsingLog.setParsingFileStatus("0");
|
|
|
|
+ parsingLog.setParsingDescribe("minio上传失败,本地文件删除,等待下次下载");
|
|
|
|
+ }
|
|
|
|
+// } else {
|
|
|
|
+// parsingLog.setParsingFileStatus("0");
|
|
|
|
+// //失败情况下会返回失败信息
|
|
|
|
+// parsingLog.setParsingDescribe(type);
|
|
|
|
+// }
|
|
parsingLogService.save(parsingLog);
|
|
parsingLogService.save(parsingLog);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- log.info("{}ftp下载文件并解析执行完成", ParsingChannel.getChannelName());
|
|
|
|
|
|
+ log.info("{}ftp下载文件并解析执行完成", ftpParsingChannel.getChannelName());
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
- log.error("{}ftp下载文件并解析过程失败", ParsingChannel.getChannelName());
|
|
|
|
|
|
+ log.error("{}ftp下载文件并解析过程失败", ftpParsingChannel.getChannelName());
|
|
e.printStackTrace();
|
|
e.printStackTrace();
|
|
} finally {
|
|
} finally {
|
|
try {
|
|
try {
|
|
if (ftp != null) {
|
|
if (ftp != null) {
|
|
ftp.close();
|
|
ftp.close();
|
|
- log.info("{}ftp连接关闭,对端ip:{}", ParsingChannel.getChannelName(), ParsingChannel.getRemoteIp());
|
|
|
|
|
|
+ log.info("{}ftp连接关闭,对端ip:{}", ftpParsingChannel.getChannelName(), ftpParsingChannel.getRemoteIp());
|
|
}
|
|
}
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
e.printStackTrace();
|
|
e.printStackTrace();
|