FtpUploadService.java 12 KB


  1. package com.jiayue.ipfcst.fileupload.service;
  2. import ch.qos.logback.classic.Logger;
  3. import cn.hutool.core.io.IoUtil;
  4. import cn.hutool.extra.ftp.Ftp;
  5. import cn.hutool.extra.ftp.FtpConfig;
  6. import cn.hutool.extra.ftp.FtpMode;
  7. import cn.hutool.extra.ssh.Sftp;
  8. import com.jiayue.ipfcst.common.data.constant.enums.AlarmTypeEnum;
  9. import com.jiayue.ipfcst.common.data.constant.enums.ChannelStatusEnum;
  10. import com.jiayue.ipfcst.common.data.entity.UploadFileChannel;
  11. import com.jiayue.ipfcst.common.data.entity.UploadFileLog;
  12. import com.jiayue.ipfcst.common.data.entity.UploadObject;
  13. import com.jiayue.ipfcst.common.data.entity.UploadURL;
  14. import com.jiayue.ipfcst.common.data.repository.UploadFileLogDetailRepository;
  15. import com.jiayue.ipfcst.common.data.repository.UploadFileLogRepository;
  16. import com.jiayue.ipfcst.common.data.repository.UploadURLRepository;
  17. import com.jiayue.ipfcst.console.service.SysAlarmService;
  18. import com.jiayue.ipfcst.fileupload.config.AppenderFactory;
  19. import com.jiayue.ipfcst.fileupload.util.FileConstant;
  20. import lombok.extern.slf4j.Slf4j;
  21. import org.springframework.stereotype.Service;
  22. import java.io.IOException;
  23. import java.io.InputStreamReader;
  24. import java.io.LineNumberReader;
  25. import java.nio.charset.Charset;
  26. import java.util.HashMap;
  27. import java.util.List;
  28. import java.util.Map;
  29. import java.util.stream.Collectors;
  30. @Service
  31. @Slf4j
  32. public class FtpUploadService extends BaseUploadFileService{
  33. final
  34. UploadFileChannelService uploadFileChannelService;
  35. final
  36. UploadFileLogRepository uploadFileLogRepository;
  37. final
  38. UploadFileLogDetailRepository uploadFileLogDetailRepository;
  39. final
  40. UploadObjectService uploadObjectService;
  41. final
  42. FtpUploadTransactionService ftpUploadTransactionService;
  43. final
  44. AppenderFactory appenderFactory;
  45. final
  46. UploadURLRepository uploadURLRepository;
  47. final
  48. SysAlarmService sysAlarmService;
  49. public FtpUploadService(UploadFileChannelService uploadFileChannelService, UploadFileLogRepository uploadFileLogRepository, UploadFileLogDetailRepository uploadFileLogDetailRepository, UploadObjectService uploadObjectService, FtpUploadTransactionService ftpUploadTransactionService, AppenderFactory appenderFactory, UploadURLRepository uploadURLRepository, SysAlarmService sysAlarmService) {
  50. this.uploadFileChannelService = uploadFileChannelService;
  51. this.uploadFileLogRepository = uploadFileLogRepository;
  52. this.uploadFileLogDetailRepository = uploadFileLogDetailRepository;
  53. this.uploadObjectService = uploadObjectService;
  54. this.ftpUploadTransactionService = ftpUploadTransactionService;
  55. this.appenderFactory = appenderFactory;
  56. this.uploadURLRepository = uploadURLRepository;
  57. this.sysAlarmService = sysAlarmService;
  58. }
  59. /**
  60. * 开启FTP客户端上报文件
  61. */
  62. public void startFtpUpload(){
  63. List<UploadObject> uploadObjectList = uploadObjectService.get();
  64. // 过滤出ftp/sftp的上报对象
  65. uploadObjectList = uploadObjectList.stream().filter(s -> s.getUploadProtocolEnum().toString().equals("E4") || s.getUploadProtocolEnum().toString().equals("E5")).collect(Collectors.toList());
  66. if (uploadObjectList.size() > 0) {
  67. // 获取通道信息
  68. List<UploadFileChannel> uploadFileChannels = uploadFileChannelService.get();
  69. // 遍历ftp的上报对象
  70. for (UploadObject uploadObject : uploadObjectList) {
  71. //找出可用的ftp通道
  72. List<UploadFileChannel> uploadFileChannelList = uploadFileChannels.stream().filter(s -> s.getChannelStatusEnum().equals(ChannelStatusEnum.E1) && s.getUploadObjectId().equals(uploadObject.getId())).collect(Collectors.toList());
  73. // 创建通道对应的日志
  74. Logger logger = appenderFactory.getLogger(uploadObject.getUploadObjectName(), uploadObject.getUploadProtocolEnum().getMessage(), uploadObject.getObjectNo());
  75. for (UploadFileChannel uploadFileChannel : uploadFileChannelList) {
  76. List<UploadURL> uploadURLList = uploadURLRepository.findByUploadChannelId(uploadFileChannel.getId());
  77. if (uploadObject.getUploadProtocolEnum().toString().equals("E4")) {
  78. // FTP方式上报
  79. Ftp ftp = null;
  80. try {
  81. FtpMode mode;
  82. if ("E1".equals(uploadFileChannel.getFtpPassiveModeEnum().toString())) {
  83. // 主动模式
  84. mode = FtpMode.Active;
  85. } else {
  86. // 被动模式
  87. mode = FtpMode.Passive;
  88. }
  89. FtpConfig ftpConfig = new FtpConfig();
  90. ftpConfig.setHost(uploadFileChannel.getRemoteIp());
  91. ftpConfig.setPort(Integer.parseInt(uploadFileChannel.getRemotePort()));
  92. ftpConfig.setPassword(uploadFileChannel.getUploadPassword());
  93. ftpConfig.setUser(uploadFileChannel.getUploadUserName());
  94. ftpConfig.setCharset(Charset.forName(uploadObject.getUploadFileCharSetEnum().getMessage()));
  95. ftpConfig.setConnectionTimeout(5 * 1000L);
  96. try {
  97. ftp = new Ftp(ftpConfig, mode);
  98. logger.info(uploadFileChannel.getChannelName() + "ftp客户端连接成功");
  99. // 连接成功,设置通道状态
  100. FileConstant.channelStatusMap.put(uploadObject.getId() + "-" + uploadFileChannel.getId(), "1");
  101. } catch (Exception e) {
  102. // 连接失败,设置通道状态
  103. FileConstant.channelStatusMap.put(uploadObject.getId() + "-" + uploadFileChannel.getId(), "0");
  104. throw e;
  105. }
  106. // 获取缓存中的上报文件
  107. logger.debug("上报文件缓存数量:" + FileConstant.readyUploadFileMap.size());
  108. Map<String, UploadFileLog> fileMap = new HashMap<>(FileConstant.readyUploadFileMap);
  109. for (Map.Entry<String, UploadFileLog> entry : fileMap.entrySet()) {
  110. String[] keys = entry.getKey().split("@");
  111. if (keys[0].equals(uploadObject.getObjectNo())) {
  112. // 获取缓存中对应的上报对象的文件
  113. logger.info("在缓存中找到通道[" + uploadFileChannel.getChannelName() + "]文件:" + keys[2]);
  114. ftpUploadTransactionService.singleUpload(uploadObject, uploadFileChannel, entry, ftp, null, keys, logger, uploadURLList);
  115. }
  116. }
  117. } catch (Exception e) {
  118. logger.error(uploadFileChannel.getChannelName() + "上报失败", e);
  119. try {
  120. Process process = Runtime.getRuntime().exec("ping " + uploadFileChannel.getRemoteIp() + " -c 2");
  121. InputStreamReader r = new InputStreamReader(process.getInputStream());
  122. LineNumberReader returnData = new LineNumberReader(r);
  123. StringBuilder returnMsg = new StringBuilder();
  124. String line;
  125. while ((line = returnData.readLine()) != null) {
  126. returnMsg.append(line);
  127. }
  128. String errorInfo = "";
  129. if (returnMsg.indexOf("100% loss") != -1) {
  130. logger.info("与 " + uploadFileChannel.getRemoteIp() + " 连接不畅通.");
  131. errorInfo = "与 " + uploadFileChannel.getRemoteIp() + " 连接不畅通.";
  132. } else {
  133. logger.info("与 " + uploadFileChannel.getRemoteIp() + " 连接畅通.");
  134. errorInfo = "与 " + uploadFileChannel.getRemoteIp() + " 连接畅通.";
  135. }
  136. // 进行告警
  137. String name = "上报文件异常";
  138. String describe = "";
  139. String solution = "";
  140. sysAlarmService.saveSysAlarm(AlarmTypeEnum.E2, name, describe, errorInfo, solution,"");
  141. returnData.close();
  142. r.close();
  143. process.getInputStream().close();
  144. process.getOutputStream().close();
  145. process.destroy();
  146. } catch (IOException ex) {
  147. logger.error("ping对方ip执行失败", ex);
  148. }
  149. } finally {
  150. if (ftp != null) {
  151. IoUtil.close(ftp);
  152. logger.info(uploadFileChannel.getChannelName() + "ftp客户端关闭连接");
  153. }
  154. }
  155. } else {
  156. // SFTP方式上报
  157. Sftp sftp = null;
  158. try {
  159. try {
  160. sftp = new Sftp(uploadFileChannel.getRemoteIp(), Integer.parseInt(uploadFileChannel.getRemotePort()), uploadFileChannel.getUploadUserName(), uploadFileChannel.getUploadPassword());
  161. logger.info(uploadFileChannel.getChannelName() + ":sftp连接创建成功");
  162. // 连接成功,设置通道状态
  163. FileConstant.channelStatusMap.put(uploadObject.getId() + "-" + uploadFileChannel.getId(), "1");
  164. } catch (Exception e) {
  165. logger.error(uploadFileChannel.getChannelName() + ":sftp连接失败", e);
  166. // 连接失败,设置通道状态
  167. FileConstant.channelStatusMap.put(uploadObject.getId() + "-" + uploadFileChannel.getId(), "0");
  168. throw e;
  169. }
  170. // 获取缓存中的上报文件
  171. logger.debug("当前文件缓存数量:" + FileConstant.readyUploadFileMap.size());
  172. Map<String, UploadFileLog> fileMap = new HashMap<>(FileConstant.readyUploadFileMap);
  173. for (Map.Entry<String, UploadFileLog> entry : fileMap.entrySet()) {
  174. String[] keys = entry.getKey().split("@");
  175. if (keys[0].equals(uploadObject.getObjectNo())) {
  176. // 获取缓存中对应的上报对象的文件
  177. logger.info("在缓存中找到通道[" + uploadFileChannel.getChannelName() + "]文件:" + keys[2] + ",将文件送入到上报程序中。");
  178. ftpUploadTransactionService.singleUpload(uploadObject, uploadFileChannel, entry, null, sftp, keys, logger, uploadURLList);
  179. }
  180. }
  181. } catch (Exception e) {
  182. logger.error(uploadFileChannel.getChannelName() + "上报失败", e);
  183. try {
  184. Process process = Runtime.getRuntime().exec("ping " + uploadFileChannel.getRemoteIp() + " -c 2");
  185. InputStreamReader r = new InputStreamReader(process.getInputStream());
  186. LineNumberReader returnData = new LineNumberReader(r);
  187. StringBuilder returnMsg = new StringBuilder();
  188. String line;
  189. while ((line = returnData.readLine()) != null) {
  190. returnMsg.append(line);
  191. }
  192. String errorInfo = "";
  193. if (returnMsg.indexOf("100% loss") != -1) {
  194. logger.info("与 " + uploadFileChannel.getRemoteIp() + " 连接不畅通.");
  195. errorInfo = "与 " + uploadFileChannel.getRemoteIp() + " 连接不畅通.";
  196. } else {
  197. logger.info("与 " + uploadFileChannel.getRemoteIp() + " 连接畅通.");
  198. errorInfo = "与 " + uploadFileChannel.getRemoteIp() + " 连接畅通.";
  199. }
  200. // 进行告警
  201. String name = "上报文件异常";
  202. String describe = "";
  203. String solution = "";
  204. sysAlarmService.saveSysAlarm(AlarmTypeEnum.E2, name, describe, errorInfo, solution,"");
  205. returnData.close();
  206. r.close();
  207. process.getInputStream().close();
  208. process.getOutputStream().close();
  209. process.destroy();
  210. } catch (IOException ex) {
  211. logger.error("ping对方ip执行失败", ex);
  212. }
  213. } finally {
  214. if (sftp != null) {
  215. logger.info("sftp客户端关闭");
  216. sftp.close();
  217. }
  218. }
  219. }
  220. }
  221. }
  222. }
  223. }
  224. }