package com.jiayue.ipfcst.fileupload.service; import ch.qos.logback.classic.Logger; import cn.hutool.core.io.IoUtil; import cn.hutool.extra.ftp.Ftp; import cn.hutool.extra.ftp.FtpConfig; import cn.hutool.extra.ftp.FtpMode; import cn.hutool.extra.ssh.Sftp; import com.jiayue.ipfcst.common.data.constant.enums.AlarmTypeEnum; import com.jiayue.ipfcst.common.data.constant.enums.ChannelStatusEnum; import com.jiayue.ipfcst.common.data.entity.UploadFileChannel; import com.jiayue.ipfcst.common.data.entity.UploadFileLog; import com.jiayue.ipfcst.common.data.entity.UploadObject; import com.jiayue.ipfcst.common.data.entity.UploadURL; import com.jiayue.ipfcst.common.data.repository.UploadFileLogDetailRepository; import com.jiayue.ipfcst.common.data.repository.UploadFileLogRepository; import com.jiayue.ipfcst.common.data.repository.UploadURLRepository; import com.jiayue.ipfcst.console.service.SysAlarmService; import com.jiayue.ipfcst.fileupload.config.AppenderFactory; import com.jiayue.ipfcst.fileupload.util.FileConstant; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import java.io.IOException; import java.io.InputStreamReader; import java.io.LineNumberReader; import java.nio.charset.Charset; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @Service @Slf4j public class FtpUploadService extends BaseUploadFileService{ final UploadFileChannelService uploadFileChannelService; final UploadFileLogRepository uploadFileLogRepository; final UploadFileLogDetailRepository uploadFileLogDetailRepository; final UploadObjectService uploadObjectService; final FtpUploadTransactionService ftpUploadTransactionService; final AppenderFactory appenderFactory; final UploadURLRepository uploadURLRepository; final SysAlarmService sysAlarmService; public FtpUploadService(UploadFileChannelService uploadFileChannelService, UploadFileLogRepository uploadFileLogRepository, UploadFileLogDetailRepository uploadFileLogDetailRepository, UploadObjectService uploadObjectService, FtpUploadTransactionService ftpUploadTransactionService, AppenderFactory appenderFactory, UploadURLRepository uploadURLRepository, SysAlarmService sysAlarmService) { this.uploadFileChannelService = uploadFileChannelService; this.uploadFileLogRepository = uploadFileLogRepository; this.uploadFileLogDetailRepository = uploadFileLogDetailRepository; this.uploadObjectService = uploadObjectService; this.ftpUploadTransactionService = ftpUploadTransactionService; this.appenderFactory = appenderFactory; this.uploadURLRepository = uploadURLRepository; this.sysAlarmService = sysAlarmService; } /** * 开启FTP客户端上报文件 */ public void startFtpUpload(){ List uploadObjectList = uploadObjectService.get(); // 过滤出ftp/sftp的上报对象 uploadObjectList = uploadObjectList.stream().filter(s -> s.getUploadProtocolEnum().toString().equals("E4") || s.getUploadProtocolEnum().toString().equals("E5")).collect(Collectors.toList()); if (uploadObjectList.size() > 0) { // 获取通道信息 List uploadFileChannels = uploadFileChannelService.get(); // 遍历ftp的上报对象 for (UploadObject uploadObject : uploadObjectList) { //找出可用的ftp通道 List uploadFileChannelList = uploadFileChannels.stream().filter(s -> s.getChannelStatusEnum().equals(ChannelStatusEnum.E1) && s.getUploadObjectId().equals(uploadObject.getId())).collect(Collectors.toList()); // 创建通道对应的日志 Logger logger = appenderFactory.getLogger(uploadObject.getUploadObjectName(), uploadObject.getUploadProtocolEnum().getMessage(), uploadObject.getObjectNo()); for (UploadFileChannel uploadFileChannel : uploadFileChannelList) { List uploadURLList = uploadURLRepository.findByUploadChannelId(uploadFileChannel.getId()); if (uploadObject.getUploadProtocolEnum().toString().equals("E4")) { // FTP方式上报 Ftp ftp = null; try { FtpMode mode; if ("E1".equals(uploadFileChannel.getFtpPassiveModeEnum().toString())) { // 主动模式 mode = FtpMode.Active; } else { // 被动模式 mode = FtpMode.Passive; } FtpConfig ftpConfig = new FtpConfig(); ftpConfig.setHost(uploadFileChannel.getRemoteIp()); ftpConfig.setPort(Integer.parseInt(uploadFileChannel.getRemotePort())); ftpConfig.setPassword(uploadFileChannel.getUploadPassword()); ftpConfig.setUser(uploadFileChannel.getUploadUserName()); ftpConfig.setCharset(Charset.forName(uploadObject.getUploadFileCharSetEnum().getMessage())); ftpConfig.setConnectionTimeout(5 * 1000L); try { ftp = new Ftp(ftpConfig, mode); logger.info(uploadFileChannel.getChannelName() + "ftp客户端连接成功"); // 连接成功,设置通道状态 FileConstant.channelStatusMap.put(uploadObject.getId() + "-" + uploadFileChannel.getId(), "1"); } catch (Exception e) { // 连接失败,设置通道状态 FileConstant.channelStatusMap.put(uploadObject.getId() + "-" + uploadFileChannel.getId(), "0"); throw e; } // 获取缓存中的上报文件 logger.debug("上报文件缓存数量:" + FileConstant.readyUploadFileMap.size()); Map fileMap = new HashMap<>(FileConstant.readyUploadFileMap); for (Map.Entry entry : fileMap.entrySet()) { String[] keys = entry.getKey().split("@"); if (keys[0].equals(uploadObject.getObjectNo())) { // 获取缓存中对应的上报对象的文件 logger.info("在缓存中找到通道[" + uploadFileChannel.getChannelName() + "]文件:" + keys[2]); ftpUploadTransactionService.singleUpload(uploadObject, uploadFileChannel, entry, ftp, null, keys, logger, uploadURLList); } } } catch (Exception e) { logger.error(uploadFileChannel.getChannelName() + "上报失败", e); try { Process process = Runtime.getRuntime().exec("ping " + uploadFileChannel.getRemoteIp() + " -c 2"); InputStreamReader r = new InputStreamReader(process.getInputStream()); LineNumberReader returnData = new LineNumberReader(r); StringBuilder returnMsg = new StringBuilder(); String line; while ((line = returnData.readLine()) != null) { returnMsg.append(line); } String errorInfo = ""; if (returnMsg.indexOf("100% loss") != -1) { logger.info("与 " + uploadFileChannel.getRemoteIp() + " 连接不畅通."); errorInfo = "与 " + uploadFileChannel.getRemoteIp() + " 连接不畅通."; } else { logger.info("与 " + uploadFileChannel.getRemoteIp() + " 连接畅通."); errorInfo = "与 " + uploadFileChannel.getRemoteIp() + " 连接畅通."; } // 进行告警 String name = "上报文件异常"; String describe = ""; String solution = ""; sysAlarmService.saveSysAlarm(AlarmTypeEnum.E2, name, describe, errorInfo, solution,""); returnData.close(); r.close(); process.getInputStream().close(); process.getOutputStream().close(); process.destroy(); } catch (IOException ex) { logger.error("ping对方ip执行失败", ex); } } finally { if (ftp != null) { IoUtil.close(ftp); logger.info(uploadFileChannel.getChannelName() + "ftp客户端关闭连接"); } } } else { // SFTP方式上报 Sftp sftp = null; try { try { sftp = new Sftp(uploadFileChannel.getRemoteIp(), Integer.parseInt(uploadFileChannel.getRemotePort()), uploadFileChannel.getUploadUserName(), uploadFileChannel.getUploadPassword()); logger.info(uploadFileChannel.getChannelName() + ":sftp连接创建成功"); // 连接成功,设置通道状态 FileConstant.channelStatusMap.put(uploadObject.getId() + "-" + uploadFileChannel.getId(), "1"); } catch (Exception e) { logger.error(uploadFileChannel.getChannelName() + ":sftp连接失败", e); // 连接失败,设置通道状态 FileConstant.channelStatusMap.put(uploadObject.getId() + "-" + uploadFileChannel.getId(), "0"); throw e; } // 获取缓存中的上报文件 logger.debug("当前文件缓存数量:" + FileConstant.readyUploadFileMap.size()); Map fileMap = new HashMap<>(FileConstant.readyUploadFileMap); for (Map.Entry entry : fileMap.entrySet()) { String[] keys = entry.getKey().split("@"); if (keys[0].equals(uploadObject.getObjectNo())) { // 获取缓存中对应的上报对象的文件 logger.info("在缓存中找到通道[" + uploadFileChannel.getChannelName() + "]文件:" + keys[2] + ",将文件送入到上报程序中。"); ftpUploadTransactionService.singleUpload(uploadObject, uploadFileChannel, entry, null, sftp, keys, logger, uploadURLList); } } } catch (Exception e) { logger.error(uploadFileChannel.getChannelName() + "上报失败", e); try { Process process = Runtime.getRuntime().exec("ping " + uploadFileChannel.getRemoteIp() + " -c 2"); InputStreamReader r = new InputStreamReader(process.getInputStream()); LineNumberReader returnData = new LineNumberReader(r); StringBuilder returnMsg = new StringBuilder(); String line; while ((line = returnData.readLine()) != null) { returnMsg.append(line); } String errorInfo = ""; if (returnMsg.indexOf("100% loss") != -1) { logger.info("与 " + uploadFileChannel.getRemoteIp() + " 连接不畅通."); errorInfo = "与 " + uploadFileChannel.getRemoteIp() + " 连接不畅通."; } else { logger.info("与 " + uploadFileChannel.getRemoteIp() + " 连接畅通."); errorInfo = "与 " + uploadFileChannel.getRemoteIp() + " 连接畅通."; } // 进行告警 String name = "上报文件异常"; String describe = ""; String solution = ""; sysAlarmService.saveSysAlarm(AlarmTypeEnum.E2, name, describe, errorInfo, solution,""); returnData.close(); r.close(); process.getInputStream().close(); process.getOutputStream().close(); process.destroy(); } catch (IOException ex) { logger.error("ping对方ip执行失败", ex); } } finally { if (sftp != null) { logger.info("sftp客户端关闭"); sftp.close(); } } } } } } } }