123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239 |
- package com.jiayue.ipfcst.fileupload.service;
- import ch.qos.logback.classic.Logger;
- import cn.hutool.core.io.IoUtil;
- import cn.hutool.extra.ftp.Ftp;
- import cn.hutool.extra.ftp.FtpConfig;
- import cn.hutool.extra.ftp.FtpMode;
- import cn.hutool.extra.ssh.Sftp;
- import com.jiayue.ipfcst.common.data.constant.enums.AlarmTypeEnum;
- import com.jiayue.ipfcst.common.data.constant.enums.ChannelStatusEnum;
- import com.jiayue.ipfcst.common.data.entity.UploadFileChannel;
- import com.jiayue.ipfcst.common.data.entity.UploadFileLog;
- import com.jiayue.ipfcst.common.data.entity.UploadObject;
- import com.jiayue.ipfcst.common.data.entity.UploadURL;
- import com.jiayue.ipfcst.common.data.repository.UploadFileLogDetailRepository;
- import com.jiayue.ipfcst.common.data.repository.UploadFileLogRepository;
- import com.jiayue.ipfcst.common.data.repository.UploadURLRepository;
- import com.jiayue.ipfcst.console.service.SysAlarmService;
- import com.jiayue.ipfcst.fileupload.config.AppenderFactory;
- import com.jiayue.ipfcst.fileupload.util.FileConstant;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Service;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.io.LineNumberReader;
- import java.nio.charset.Charset;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.stream.Collectors;
- @Service
- @Slf4j
- public class FtpUploadService extends BaseUploadFileService{
- final
- UploadFileChannelService uploadFileChannelService;
- final
- UploadFileLogRepository uploadFileLogRepository;
- final
- UploadFileLogDetailRepository uploadFileLogDetailRepository;
- final
- UploadObjectService uploadObjectService;
- final
- FtpUploadTransactionService ftpUploadTransactionService;
- final
- AppenderFactory appenderFactory;
- final
- UploadURLRepository uploadURLRepository;
- final
- SysAlarmService sysAlarmService;
- public FtpUploadService(UploadFileChannelService uploadFileChannelService, UploadFileLogRepository uploadFileLogRepository, UploadFileLogDetailRepository uploadFileLogDetailRepository, UploadObjectService uploadObjectService, FtpUploadTransactionService ftpUploadTransactionService, AppenderFactory appenderFactory, UploadURLRepository uploadURLRepository, SysAlarmService sysAlarmService) {
- this.uploadFileChannelService = uploadFileChannelService;
- this.uploadFileLogRepository = uploadFileLogRepository;
- this.uploadFileLogDetailRepository = uploadFileLogDetailRepository;
- this.uploadObjectService = uploadObjectService;
- this.ftpUploadTransactionService = ftpUploadTransactionService;
- this.appenderFactory = appenderFactory;
- this.uploadURLRepository = uploadURLRepository;
- this.sysAlarmService = sysAlarmService;
- }
- /**
- * 开启FTP客户端上报文件
- */
- public void startFtpUpload(){
- List<UploadObject> uploadObjectList = uploadObjectService.get();
- // 过滤出ftp/sftp的上报对象
- uploadObjectList = uploadObjectList.stream().filter(s -> s.getUploadProtocolEnum().toString().equals("E4") || s.getUploadProtocolEnum().toString().equals("E5")).collect(Collectors.toList());
- if (uploadObjectList.size() > 0) {
- // 获取通道信息
- List<UploadFileChannel> uploadFileChannels = uploadFileChannelService.get();
- // 遍历ftp的上报对象
- for (UploadObject uploadObject : uploadObjectList) {
- //找出可用的ftp通道
- List<UploadFileChannel> uploadFileChannelList = uploadFileChannels.stream().filter(s -> s.getChannelStatusEnum().equals(ChannelStatusEnum.E1) && s.getUploadObjectId().equals(uploadObject.getId())).collect(Collectors.toList());
- // 创建通道对应的日志
- Logger logger = appenderFactory.getLogger(uploadObject.getUploadObjectName(), uploadObject.getUploadProtocolEnum().getMessage(), uploadObject.getObjectNo());
- for (UploadFileChannel uploadFileChannel : uploadFileChannelList) {
- List<UploadURL> uploadURLList = uploadURLRepository.findByUploadChannelId(uploadFileChannel.getId());
- if (uploadObject.getUploadProtocolEnum().toString().equals("E4")) {
- // FTP方式上报
- Ftp ftp = null;
- try {
- FtpMode mode;
- if ("E1".equals(uploadFileChannel.getFtpPassiveModeEnum().toString())) {
- // 主动模式
- mode = FtpMode.Active;
- } else {
- // 被动模式
- mode = FtpMode.Passive;
- }
- FtpConfig ftpConfig = new FtpConfig();
- ftpConfig.setHost(uploadFileChannel.getRemoteIp());
- ftpConfig.setPort(Integer.parseInt(uploadFileChannel.getRemotePort()));
- ftpConfig.setPassword(uploadFileChannel.getUploadPassword());
- ftpConfig.setUser(uploadFileChannel.getUploadUserName());
- ftpConfig.setCharset(Charset.forName(uploadObject.getUploadFileCharSetEnum().getMessage()));
- ftpConfig.setConnectionTimeout(5 * 1000L);
- try {
- ftp = new Ftp(ftpConfig, mode);
- logger.info(uploadFileChannel.getChannelName() + "ftp客户端连接成功");
- // 连接成功,设置通道状态
- FileConstant.channelStatusMap.put(uploadObject.getId() + "-" + uploadFileChannel.getId(), "1");
- } catch (Exception e) {
- // 连接失败,设置通道状态
- FileConstant.channelStatusMap.put(uploadObject.getId() + "-" + uploadFileChannel.getId(), "0");
- throw e;
- }
- // 获取缓存中的上报文件
- logger.debug("上报文件缓存数量:" + FileConstant.readyUploadFileMap.size());
- Map<String, UploadFileLog> fileMap = new HashMap<>(FileConstant.readyUploadFileMap);
- for (Map.Entry<String, UploadFileLog> entry : fileMap.entrySet()) {
- String[] keys = entry.getKey().split("@");
- if (keys[0].equals(uploadObject.getObjectNo())) {
- // 获取缓存中对应的上报对象的文件
- logger.info("在缓存中找到通道[" + uploadFileChannel.getChannelName() + "]文件:" + keys[2]);
- ftpUploadTransactionService.singleUpload(uploadObject, uploadFileChannel, entry, ftp, null, keys, logger, uploadURLList);
- }
- }
- } catch (Exception e) {
- logger.error(uploadFileChannel.getChannelName() + "上报失败", e);
- try {
- Process process = Runtime.getRuntime().exec("ping " + uploadFileChannel.getRemoteIp() + " -c 2");
- InputStreamReader r = new InputStreamReader(process.getInputStream());
- LineNumberReader returnData = new LineNumberReader(r);
- StringBuilder returnMsg = new StringBuilder();
- String line;
- while ((line = returnData.readLine()) != null) {
- returnMsg.append(line);
- }
- String errorInfo = "";
- if (returnMsg.indexOf("100% loss") != -1) {
- logger.info("与 " + uploadFileChannel.getRemoteIp() + " 连接不畅通.");
- errorInfo = "与 " + uploadFileChannel.getRemoteIp() + " 连接不畅通.";
- } else {
- logger.info("与 " + uploadFileChannel.getRemoteIp() + " 连接畅通.");
- errorInfo = "与 " + uploadFileChannel.getRemoteIp() + " 连接畅通.";
- }
- // 进行告警
- String name = "上报文件异常";
- String describe = "";
- String solution = "";
- sysAlarmService.saveSysAlarm(AlarmTypeEnum.E2, name, describe, errorInfo, solution,"");
- returnData.close();
- r.close();
- process.getInputStream().close();
- process.getOutputStream().close();
- process.destroy();
- } catch (IOException ex) {
- logger.error("ping对方ip执行失败", ex);
- }
- } finally {
- if (ftp != null) {
- IoUtil.close(ftp);
- logger.info(uploadFileChannel.getChannelName() + "ftp客户端关闭连接");
- }
- }
- } else {
- // SFTP方式上报
- Sftp sftp = null;
- try {
- try {
- sftp = new Sftp(uploadFileChannel.getRemoteIp(), Integer.parseInt(uploadFileChannel.getRemotePort()), uploadFileChannel.getUploadUserName(), uploadFileChannel.getUploadPassword());
- logger.info(uploadFileChannel.getChannelName() + ":sftp连接创建成功");
- // 连接成功,设置通道状态
- FileConstant.channelStatusMap.put(uploadObject.getId() + "-" + uploadFileChannel.getId(), "1");
- } catch (Exception e) {
- logger.error(uploadFileChannel.getChannelName() + ":sftp连接失败", e);
- // 连接失败,设置通道状态
- FileConstant.channelStatusMap.put(uploadObject.getId() + "-" + uploadFileChannel.getId(), "0");
- throw e;
- }
- // 获取缓存中的上报文件
- logger.debug("当前文件缓存数量:" + FileConstant.readyUploadFileMap.size());
- Map<String, UploadFileLog> fileMap = new HashMap<>(FileConstant.readyUploadFileMap);
- for (Map.Entry<String, UploadFileLog> entry : fileMap.entrySet()) {
- String[] keys = entry.getKey().split("@");
- if (keys[0].equals(uploadObject.getObjectNo())) {
- // 获取缓存中对应的上报对象的文件
- logger.info("在缓存中找到通道[" + uploadFileChannel.getChannelName() + "]文件:" + keys[2] + ",将文件送入到上报程序中。");
- ftpUploadTransactionService.singleUpload(uploadObject, uploadFileChannel, entry, null, sftp, keys, logger, uploadURLList);
- }
- }
- } catch (Exception e) {
- logger.error(uploadFileChannel.getChannelName() + "上报失败", e);
- try {
- Process process = Runtime.getRuntime().exec("ping " + uploadFileChannel.getRemoteIp() + " -c 2");
- InputStreamReader r = new InputStreamReader(process.getInputStream());
- LineNumberReader returnData = new LineNumberReader(r);
- StringBuilder returnMsg = new StringBuilder();
- String line;
- while ((line = returnData.readLine()) != null) {
- returnMsg.append(line);
- }
- String errorInfo = "";
- if (returnMsg.indexOf("100% loss") != -1) {
- logger.info("与 " + uploadFileChannel.getRemoteIp() + " 连接不畅通.");
- errorInfo = "与 " + uploadFileChannel.getRemoteIp() + " 连接不畅通.";
- } else {
- logger.info("与 " + uploadFileChannel.getRemoteIp() + " 连接畅通.");
- errorInfo = "与 " + uploadFileChannel.getRemoteIp() + " 连接畅通.";
- }
- // 进行告警
- String name = "上报文件异常";
- String describe = "";
- String solution = "";
- sysAlarmService.saveSysAlarm(AlarmTypeEnum.E2, name, describe, errorInfo, solution,"");
- returnData.close();
- r.close();
- process.getInputStream().close();
- process.getOutputStream().close();
- process.destroy();
- } catch (IOException ex) {
- logger.error("ping对方ip执行失败", ex);
- }
- } finally {
- if (sftp != null) {
- logger.info("sftp客户端关闭");
- sftp.close();
- }
- }
- }
- }
- }
- }
- }
- }
|