Преглед изворни кода

新增上报对象和通道

xusl пре 3 година
родитељ
комит
46314bac22
24 измењених фајлова са 2868 додато и 80 уклоњено
  1. 148 0
      ipfcst-common/ipfcst-common-data/src/main/java/com/jiayue/ipfcst/common/data/entity/UploadFileChannel.java
  2. 79 0
      ipfcst-common/ipfcst-common-data/src/main/java/com/jiayue/ipfcst/common/data/entity/UploadFileLogDetail.java
  3. 82 0
      ipfcst-common/ipfcst-common-data/src/main/java/com/jiayue/ipfcst/common/data/entity/UploadObject.java
  4. 52 0
      ipfcst-common/ipfcst-common-data/src/main/java/com/jiayue/ipfcst/common/data/entity/UploadURL.java
  5. 22 0
      ipfcst-common/ipfcst-common-data/src/main/java/com/jiayue/ipfcst/common/data/repository/UploadFileChannelRepository.java
  6. 14 0
      ipfcst-common/ipfcst-common-data/src/main/java/com/jiayue/ipfcst/common/data/repository/UploadObjectRepository.java
  7. 14 0
      ipfcst-common/ipfcst-common-data/src/main/java/com/jiayue/ipfcst/common/data/repository/UploadURLRepository.java
  8. 287 0
      ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/IEC102/Base102Service.java
  9. 12 0
      ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/IEC102/NettyParent.java
  10. 225 0
      ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/IEC102/ServerFor102Decoder.java
  11. 21 0
      ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/IEC102/ServerFor102Encoder.java
  12. 34 0
      ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/IEC102/ServerFor102InitChannel.java
  13. 422 0
      ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/IEC102/ServerFor102NMService.java
  14. 347 0
      ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/IEC102/ServerFor102StandardService.java
  15. 276 0
      ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/IEC102/ServerFor102TransitHandler.java
  16. 67 0
      ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/IEC102/UploadFileNettyServer.java
  17. 118 0
      ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/config/AppenderFactory.java
  18. 120 0
      ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/config/FileUploadConfig.java
  19. 17 0
      ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/dto/Validate102Dto.java
  20. 88 80
      ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/service/BaseUploadFileService.java
  21. 223 0
      ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/service/UploadFileChannelService.java
  22. 114 0
      ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/service/UploadObjectService.java
  23. 48 0
      ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/util/FileConstant.java
  24. 38 0
      ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/util/FileMutableInteger.java

+ 148 - 0
ipfcst-common/ipfcst-common-data/src/main/java/com/jiayue/ipfcst/common/data/entity/UploadFileChannel.java

@@ -0,0 +1,148 @@
+package com.jiayue.ipfcst.common.data.entity;
+
+import com.jiayue.ipfcst.common.data.abst.AbstractBaseEntity;
+import com.jiayue.ipfcst.common.data.constant.enums.*;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.hibernate.annotations.GenericGenerator;
+import org.springframework.core.annotation.Order;
+
+import javax.persistence.*;
+
+/**
+ * 上报文件通道
+ *
+ * @author xsl
+ * @version 3.0
+ */
+@EqualsAndHashCode(callSuper = true)
+@Data
+@Entity
+public class UploadFileChannel extends AbstractBaseEntity {
+
+    private static final long serialVersionUID = 1L;
+
+    @Id
+    @Order(1)
+    @GeneratedValue(strategy = GenerationType.AUTO, generator = "myid")
+    @GenericGenerator(name = "myid", strategy = "com.jiayue.ipfcst.common.data.entity.id.CustomIDGenerator")
+    private Integer id;
+
+    /**
+     * 上报对象ID
+     */
+    @Column
+    private Integer uploadObjectId;
+
+    /**
+     * 通道名称
+     */
+    @Column
+    private String channelName;
+
+    /**
+     * 通道使用状态
+     */
+    @Column
+    @Enumerated(EnumType.STRING)
+    private ChannelStatusEnum channelStatusEnum;
+
+    /**
+     * 本地IP
+     */
+    @Column
+    private String localIp;
+
+    /**
+     * 本地端口
+     */
+    @Column
+    private String localPort;
+
+    /**
+     * 远端IP
+     */
+    @Column
+    private String remoteIp;
+
+    /**
+     * 远端端口
+     */
+    @Column
+    private String remotePort;
+
+    /**
+     * 上报文件名称长度
+     */
+    @Column
+    @Enumerated(EnumType.STRING)
+    private UploadFileNameLengthEnum uploadFileNameLengthEnum;
+
+    /**
+     * 上报文件单次传输字节
+     */
+    @Column
+    @Enumerated(EnumType.STRING)
+    private UploadFileSingleByteEnum uploadFileSingleByteEnum;
+
+    /**
+     * FTP/SFTP用户名/102山东
+     */
+    @Column
+    private String uploadUserName;
+
+    /**
+     * FTP/SFTP密码/102山东
+     */
+    @Column
+    private String uploadPassword;
+
+    /**
+     * FTP/SFTP服务端路径
+     */
+    @Column
+    private String pathService;
+
+    /**
+     * SFTP连接方式
+     */
+    @Enumerated(EnumType.STRING)
+    private SftpConnectModeEnum sftpConnectModeEnum;
+
+    /**
+     * SFTP密钥路径
+     */
+    @Column
+    private String keyFilePath;
+
+    /**
+     * SFTP密钥密码
+     */
+    @Column
+    private String passPhrase;
+
+    /**
+     * 通道备注
+     */
+    @Column
+    private String channelRemarks;
+
+    /**
+     * FTP模式
+     */
+    @Column
+    @Enumerated(EnumType.STRING)
+    private FtpPassiveModeEnum ftpPassiveModeEnum;
+
+    /**
+     * 场站编号(运维标识)
+     */
+    @Column
+    private String stationCode;
+
+
+    /**
+     * 备用字段C 1、多个路径 0和空代表公用一个路径
+     * backupC 代表ftp模式下是否有多个上报路径
+     */
+}

+ 79 - 0
ipfcst-common/ipfcst-common-data/src/main/java/com/jiayue/ipfcst/common/data/entity/UploadFileLogDetail.java

@@ -0,0 +1,79 @@
+package com.jiayue.ipfcst.common.data.entity;
+
+import com.jiayue.ipfcst.common.data.abst.AbstractBaseEntity;
+import com.jiayue.ipfcst.common.data.constant.enums.FileStatusEnum;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.hibernate.annotations.GenericGenerator;
+import org.springframework.core.annotation.Order;
+
+import javax.persistence.*;
+
+
+/**
+ * 上报文件日志明细
+ *
+ * @author xsl
+ * @version 3.0
+ */
+@EqualsAndHashCode(callSuper = true)
+@Data
+@Entity
+public class UploadFileLogDetail extends AbstractBaseEntity {
+    private static final long serialVersionUID = 1L;
+
+    @Id
+    @Order(1)
+    @GeneratedValue(strategy = GenerationType.AUTO, generator = "myid")
+    @GenericGenerator(name = "myid", strategy = "com.jiayue.ipfcst.common.data.entity.id.CustomIDGenerator")
+    private Integer id;
+
+    /**
+     * 上报对象名称(记录哪个通道上报的文件)
+     */
+    @Column
+    private String uploadObjectName;
+
+    /**
+     * 通道主键ID(记录哪个通道上报的文件)
+     */
+    @Column
+    private Integer uploadChanneId;
+
+    /**
+     * 上报通道名称(记录哪个通道上报的文件)
+     */
+    @Column
+    private String uploadChannelName;
+
+    /**
+     * 文件日志主键ID
+     */
+    @Column
+    private Integer uploadFileLogId;
+
+    /**
+     * 文件名称
+     */
+    @Column
+    private String fileName;
+
+    /**
+     * 交互时间
+     */
+    @Column
+    private Long uploadTime;
+
+    /**
+     * 上报状态
+     */
+    @Column
+    @Enumerated(EnumType.STRING)
+    private FileStatusEnum fileStatusEnum;
+
+    /**
+     * 失败原因
+     */
+    @Column
+    private String uploadFailureReason;
+}

+ 82 - 0
ipfcst-common/ipfcst-common-data/src/main/java/com/jiayue/ipfcst/common/data/entity/UploadObject.java

@@ -0,0 +1,82 @@
+package com.jiayue.ipfcst.common.data.entity;
+
+import com.jiayue.ipfcst.common.data.abst.AbstractBaseEntity;
+import com.jiayue.ipfcst.common.data.constant.enums.UploadFileCharSetEnum;
+import com.jiayue.ipfcst.common.data.constant.enums.UploadProtocolEnum;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.hibernate.annotations.GenericGenerator;
+import org.springframework.core.annotation.Order;
+
+import javax.persistence.*;
+
+/**
+ * 上报对象
+ *
+ * @author xsl
+ * @version 3.0
+ */
+@EqualsAndHashCode(callSuper = true)
+@Data
+@Entity
+public class UploadObject extends AbstractBaseEntity {
+
+    private static final long serialVersionUID = 1L;
+
+    @Id
+    @Order(1)
+    @GeneratedValue(strategy = GenerationType.AUTO, generator = "myid")
+    @GenericGenerator(name = "myid", strategy = "com.jiayue.ipfcst.common.data.entity.id.CustomIDGenerator")
+    private Integer id;
+
+    /**
+     * 上报对象名称
+     */
+    @Column
+    private String uploadObjectName;
+
+    /**
+     * 上报对象编号
+     */
+    @Column
+    private String objectNo;
+
+    /**
+     * 上报文件类型 使用","分割
+     */
+    @Column
+    private String uploadFileType;
+
+    /**
+     * 上报文件字符集
+     */
+    @Column
+    @Enumerated(EnumType.STRING)
+    private UploadFileCharSetEnum uploadFileCharSetEnum;
+
+    /**
+     * 上报协议
+     */
+    @Column
+    @Enumerated(EnumType.STRING)
+    private UploadProtocolEnum uploadProtocolEnum;
+
+    /**
+     * 上报D5000(1是、0否)
+     */
+    @Column
+    private String uploadD5000;
+
+    /**
+     * 上报对象备注
+     */
+    @Column
+    private String uploadObjectRemarks;
+
+    /**
+     * 场站编号(运维标识)
+     */
+    @Column
+    private String stationCode;
+
+}

+ 52 - 0
ipfcst-common/ipfcst-common-data/src/main/java/com/jiayue/ipfcst/common/data/entity/UploadURL.java

@@ -0,0 +1,52 @@
+package com.jiayue.ipfcst.common.data.entity;
+
+import com.jiayue.ipfcst.common.data.abst.AbstractBaseEntity;
+import com.jiayue.ipfcst.common.data.constant.enums.FileTypeEnum;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.hibernate.annotations.GenericGenerator;
+import org.springframework.core.annotation.Order;
+
+import javax.persistence.*;
+
+/**
+ * 上报路径
+ *
+ * @author tl
+ * @version 3.0
+ */
+@EqualsAndHashCode(callSuper = true)
+@Entity
+@Data
+public class UploadURL extends AbstractBaseEntity {
+
+    @Id
+    @Order(1)
+    @GeneratedValue(strategy = GenerationType.AUTO, generator = "myid")
+    @GenericGenerator(name = "myid", strategy = "com.jiayue.ipfcst.common.data.entity.id.CustomIDGenerator")
+    private Integer id;
+
+    //上报路径
+    @Column
+    private String uploadURL;
+
+    //上报对象id
+    @Column
+    private Integer uploadObjectId;
+
+    //上报通道id
+    @Column
+    private Integer uploadChannelId;
+
+    //文件类型
+    @Column
+    @Enumerated(EnumType.STRING)
+    private FileTypeEnum fileTypeEnum;
+
+    /**
+     * 场站编号(运维标识)
+     */
+    @Column
+    private String stationCode;
+
+}

+ 22 - 0
ipfcst-common/ipfcst-common-data/src/main/java/com/jiayue/ipfcst/common/data/repository/UploadFileChannelRepository.java

@@ -0,0 +1,22 @@
+package com.jiayue.ipfcst.common.data.repository;
+
+import com.jiayue.ipfcst.common.data.constant.enums.ChannelStatusEnum;
+import com.jiayue.ipfcst.common.data.entity.UploadFileChannel;
+
+import java.util.List;
+
+/**
+ * 上报文件通道表仓储
+ *
+ * @author xsl
+ * @version 3.0
+ */
+public interface UploadFileChannelRepository extends BaseRepository<UploadFileChannel, Integer> {
+    List<UploadFileChannel> findAllByUploadObjectId(Integer id);
+
+    List<UploadFileChannel> findByRemoteIp(String remoteIp);
+
+    void deleteAllByUploadObjectId(Integer uploadObjectId);
+
+    List<UploadFileChannel> findByChannelStatusEnum(ChannelStatusEnum channelStatusEnum);
+}

+ 14 - 0
ipfcst-common/ipfcst-common-data/src/main/java/com/jiayue/ipfcst/common/data/repository/UploadObjectRepository.java

@@ -0,0 +1,14 @@
+package com.jiayue.ipfcst.common.data.repository;
+
+import com.jiayue.ipfcst.common.data.entity.UploadObject;
+
+/**
+ * 上报对象表仓储
+ *
+ * @author xsl
+ * @version 3.0
+ */
+public interface UploadObjectRepository extends BaseRepository<UploadObject, Integer> {
+
+    UploadObject findByObjectNo(String no);
+}

+ 14 - 0
ipfcst-common/ipfcst-common-data/src/main/java/com/jiayue/ipfcst/common/data/repository/UploadURLRepository.java

@@ -0,0 +1,14 @@
+package com.jiayue.ipfcst.common.data.repository;
+
+import com.jiayue.ipfcst.common.data.entity.UploadURL;
+
+import java.util.List;
+
+public interface UploadURLRepository extends BaseRepository<UploadURL,Integer> {
+    List<UploadURL> findByUploadChannelId(Integer id);
+
+
+    void deleteByUploadChannelId(Integer uploadChannelId);
+
+    void deleteByUploadObjectId(Integer uploadObjectId);
+}

+ 287 - 0
ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/IEC102/Base102Service.java

@@ -0,0 +1,287 @@
+package com.jiayue.ipfcst.fileupload.IEC102;
+
+import com.jiayue.ipfcst.common.data.constant.enums.FileStatusEnum;
+import com.jiayue.ipfcst.common.data.constant.enums.FileTypeEnum;
+import com.jiayue.ipfcst.common.data.entity.UploadFileChannel;
+import com.jiayue.ipfcst.common.data.entity.UploadFileLog;
+import com.jiayue.ipfcst.common.data.entity.UploadFileLogDetail;
+import com.jiayue.ipfcst.common.data.entity.UploadObject;
+import com.jiayue.ipfcst.common.data.repository.UploadFileLogDetailRepository;
+import com.jiayue.ipfcst.common.data.repository.UploadFileLogRepository;
+import com.jiayue.ipfcst.console.service.SysParameterService;
+import com.jiayue.ipfcst.fileupload.service.UploadFileCodeService;
+import com.jiayue.ipfcst.fileupload.util.ByteUtil;
+import com.jiayue.ipfcst.fileupload.util.FileConstant;
+import com.jiayue.ipfcst.fileupload.util.FileMutableInteger;
+import com.jiayue.ipfcst.fileupload.util.FileUtil;
+import org.apache.commons.lang.time.DateFormatUtils;
+import org.slf4j.Logger;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * 102业务处理基础类
+ *
+ * @author xsl
+ * @version 3.0
+ */
+@Service
+public class Base102Service {
+  @Autowired
+  UploadFileLogRepository uploadFileLogRepository;
+  @Autowired
+  UploadFileLogDetailRepository uploadFileLogDetailRepository;
+  @Autowired
+  SysParameterService sysParameterService;
+  @Autowired
+  UploadFileCodeService uploadFileCodeService;
+
+  /**
+   * 统计文件上报成功或者失败并移动文件
+   *
+   * @param uploadFileChannel 通道信息
+   * @param faileReason       失败原因
+   */
+  public void totalFileUploadNums(UploadObject uploadObject, UploadFileChannel uploadFileChannel, String faileReason, Logger log) {
+    // 上报文件次数缓存加1
+    Map<String, FileMutableInteger> filterMap = FileConstant.uploadCountMap.entrySet().stream().filter(r -> uploadObject.getObjectNo().equals(r.getKey().substring(0, r.getKey().indexOf("@"))))
+      .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue,
+        (oldValue, newValue) -> oldValue, LinkedHashMap::new));
+    // 上报文件名
+    String fileNameKey = "";
+    for (Map.Entry<String, FileMutableInteger> entry : filterMap.entrySet()) {
+      fileNameKey = entry.getKey();
+      break;
+    }
+    String[] keys = new String[3];
+    String tempFileName = fileNameKey;
+    log.info("UploadNums操作文件:"+tempFileName);
+    keys[0] = tempFileName.substring(0,tempFileName.indexOf("@",0));
+    tempFileName = tempFileName.substring(tempFileName.indexOf("@",0)+1);
+    keys[1] = tempFileName.substring(0,tempFileName.indexOf("@",0));
+    keys[2] = tempFileName.substring(tempFileName.indexOf("@",0)+1);
+
+    // 找到文件日志记录
+    //今日凌晨
+//    Long st = DateTimeUtil.getMillisecondsSubDay();
+    //明日凌晨
+//    Long et = DateTimeUtil.getMillisecondsSubDay() + 1000 * 60 * 60 * 24-1;
+
+//    Date fileCreateTime = FileConstant.fileShouldMomentMap.get(fileNameKey);
+    Integer id = FileConstant.fileShouldMomentMap.get(fileNameKey);
+    UploadFileLog uploadFileLog = null;
+    Optional<UploadFileLog> fileLogOptional = uploadFileLogRepository.findById(id);
+    if (fileLogOptional.isPresent()) {
+      uploadFileLog = fileLogOptional.get();
+    }
+
+    // 先判断是否查出日志,如果没有则认为文件是本地拷贝进去的
+    if (uploadFileLog == null) {
+      UploadFileLog newUploadFileLog = new UploadFileLog();
+      newUploadFileLog.setUploadObjectId(uploadObject.getId());
+      newUploadFileLog.setFileStatusEnum(FileStatusEnum.E1);
+      newUploadFileLog.setUploadProtocolEnum(uploadObject.getUploadProtocolEnum());
+      newUploadFileLog.setFileName(keys[2]);
+      newUploadFileLog.setFileTypeEnum(FileTypeEnum.valueOf(keys[1]));
+      newUploadFileLog.setUploadObjectName(uploadObject.getUploadObjectName());
+      newUploadFileLog.setUploadObjectNo(uploadObject.getObjectNo());
+      newUploadFileLog.setUploadCounter(0);
+      newUploadFileLog.setFileRemarks("当日没找到日志记录,系统插入一条");
+      uploadFileLog = uploadFileLogRepository.save(newUploadFileLog);
+    }
+
+    if ("".equals(faileReason)) {
+      UploadFileLogDetail uploadFileLogDetail = new UploadFileLogDetail();
+      uploadFileLogDetail.setUploadObjectName(uploadObject.getUploadObjectName());
+      uploadFileLogDetail.setUploadFileLogId(uploadFileLog.getId());
+      uploadFileLogDetail.setUploadTime(System.currentTimeMillis());
+      uploadFileLogDetail.setFileName(keys[2]);
+      uploadFileLogDetail.setUploadChanneId(uploadFileChannel.getId());
+      uploadFileLogDetail.setUploadChannelName(uploadFileChannel.getChannelName());
+      uploadFileLogDetail.setFileStatusEnum(FileStatusEnum.E2);
+//      fileMutableInteger.getUploadFileLogDetail().add(uploadFileLogDetail);
+      // 上报成功保存数据库,更新上报表,上报明细
+//      uploadFileLogRepository.updateFileUploadCount(FileStatusEnum.E2, fileMutableInteger.getValue(), uploadObject.getId(), keys[2]);
+      uploadFileLog.setFileStatusEnum(FileStatusEnum.E2);
+      uploadFileLog.setUploadCounter(uploadFileLog.getUploadCounter()+1);
+      uploadFileLogRepository.save(uploadFileLog);
+      uploadFileLogDetailRepository.save(uploadFileLogDetail);
+      // 将文件移动到成功目录
+      String srcPath = FileUtil.getFileUploadPath() + File.separator + "process" + File.separator + keys[0] + File.separator + keys[1] + File.separator + keys[2];
+      String targetPath = FileUtil.getFileUploadPath() + File.separator + "backups" + File.separator + keys[0] + File.separator + DateFormatUtils.format(System.currentTimeMillis(), "yyyyMMdd") + File.separator + keys[1];
+
+      try {
+        FileUtil.move(srcPath, targetPath);
+      } catch (Exception e) {
+        log.error("上报成功移动文件失败",e);
+      }
+      // 清理缓存
+      FileConstant.uploadCountMap.remove(fileNameKey);
+      FileConstant.readyUploadFileMap.remove(fileNameKey);
+      FileConstant.fileShouldMomentMap.remove(fileNameKey);
+      FileConstant.fileContentMap.remove(fileNameKey);
+      FileConstant.againUploadFileMap.remove(fileNameKey);
+      FileConstant.filePackageNumMap.remove(fileNameKey);
+      log.info("上报对象编号:" + keys[0] + " 文件名:" + keys[2] + " 本次上报结果:成功");
+    } else {
+      // 保存上报失败明细表
+      UploadFileLogDetail uploadFileLogDetail = new UploadFileLogDetail();
+      uploadFileLogDetail.setUploadObjectName(uploadObject.getUploadObjectName());
+      uploadFileLogDetail.setUploadFileLogId(uploadFileLog.getId());
+      uploadFileLogDetail.setUploadTime(System.currentTimeMillis());
+      uploadFileLogDetail.setFileName(keys[2]);
+      uploadFileLogDetail.setUploadChanneId(uploadFileChannel.getId());
+      uploadFileLogDetail.setUploadChannelName(uploadFileChannel.getChannelName());
+      uploadFileLogDetail.setFileStatusEnum(FileStatusEnum.E3);
+      uploadFileLogDetail.setUploadFailureReason(faileReason);
+      uploadFileLog.setFileStatusEnum(FileStatusEnum.E3);
+      uploadFileLog.setUploadCounter(uploadFileLog.getUploadCounter()+1);
+      uploadFileLogRepository.save(uploadFileLog);
+      uploadFileLogDetailRepository.save(uploadFileLogDetail);
+
+      if (FileConstant.againUploadFileMap.get(fileNameKey)==null){
+        // 遇到失败,再给这个文件一次上报的机会
+        FileConstant.againUploadFileMap.put(fileNameKey,"");
+        log.info(fileNameKey+"上报结果失败,再报一次变量中存入");
+      }
+      else{
+        // 将文件移动到失败目录
+        String srcPath = FileUtil.getFileUploadPath() + File.separator + "process" + File.separator + keys[0] + File.separator + keys[1] + File.separator + keys[2];
+        String targetPath = FileUtil.getFileUploadPath() + File.separator + "error" + File.separator + keys[0] + File.separator + DateFormatUtils.format(System.currentTimeMillis(), "yyyyMMdd") + File.separator + keys[1];
+        try {
+          FileUtil.move(srcPath, targetPath);
+        } catch (Exception e) {
+          log.error("将文件移动到失败目录",e);
+        }
+        FileConstant.readyUploadFileMap.remove(fileNameKey);
+        FileConstant.fileShouldMomentMap.remove(fileNameKey);
+        FileConstant.againUploadFileMap.remove(fileNameKey);
+      }
+      // 清理缓存
+      FileConstant.uploadCountMap.remove(fileNameKey);
+      FileConstant.fileContentMap.remove(fileNameKey);
+      FileConstant.filePackageNumMap.remove(fileNameKey);
+      log.info("上报对象编号:" + keys[0] + " 文件名:" + keys[2] + " 本次上报结果失败:" + faileReason);
+    }
+  }
+
+  /**
+   * 读取文件内容转16进制报文
+   *
+   * @param filePath
+   * @param charsetName
+   * @return
+   */
+  public String createFileMessageHex(String filePath, String charsetName, Logger log) {
+    InputStreamReader isr = null;
+    String fileMessageHex = "";
+    try {
+      isr = new InputStreamReader(new FileInputStream(filePath), "UTF-8");
+      StringBuffer sb = new StringBuffer("");
+      int len1 = 0;
+      while ((len1 = isr.read()) != -1) {
+        sb.append((char) len1);
+      }
+      isr.close();
+
+      String str = sb.toString();
+      fileMessageHex = ByteUtil.Byte2String(str.getBytes(charsetName));
+      fileMessageHex = fileMessageHex.replaceAll(" ", "");
+      ////////////////////以下注释生成本地文件/////////////////////
+//      int len = fileMessageHex.length();
+//      byte[] data = new byte[len / 2];
+//      for (
+//        int i = 0;
+//        i < len; i += 2) {
+//        data[i / 2] = (byte) ((Character.digit(fileMessageHex.charAt(i), 16) << 4)
+//          + Character.digit(fileMessageHex.charAt(i + 1), 16));
+//      }
+//      OutputStreamWriter osw = new OutputStreamWriter(new FileOutputStream("d:\\sdf.PVD"), charsetName);
+//      String res = new String(data);
+//      osw.write(res);
+//      osw.close();
+    } catch (Exception e) {
+      log.error("读取本地文件失败", e);
+    } finally {
+      if (isr != null) {
+        try {
+          isr.close();
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+    return fileMessageHex;
+  }
+
+  /**
+   * 根据通道过滤缓存Map的key是否存在
+   *
+   * @param uploadObjectNo
+   * @param map
+   * @return
+   */
+  public Map<String, UploadFileLog> filterFileByObjectNo(String uploadObjectNo, Map<String, UploadFileLog> map, Logger log) {
+    // 先过滤出上报对象下的所有文件
+    Map<String, UploadFileLog> filterMap = map.entrySet().stream().filter(r -> uploadObjectNo.equals(r.getKey().substring(0, r.getKey().indexOf("@"))))
+      .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue,
+        (oldValue, newValue) -> oldValue, LinkedHashMap::new));
+    // 再将失效时间的文件移除
+    Map<String, UploadFileLog> fileterFileMap = new HashMap<>();
+    Date d = new Date();
+    for (Map.Entry<String, UploadFileLog> entry : filterMap.entrySet()) {
+      UploadFileLog u = entry.getValue();
+      if (u.getUploadFileEndTime() == null) {
+        // 有效的文件
+        fileterFileMap.put(entry.getKey(), u);
+      } else {
+        if (d.getTime() <= u.getUploadFileEndTime()) {
+          // 有效的文件
+          fileterFileMap.put(entry.getKey(), u);
+        } else {
+          // 过期文件
+          String[] keys = new String[3];
+          keys[0] = entry.getKey().substring(0,entry.getKey().indexOf("@",0));
+          keys[1] = entry.getKey().substring(entry.getKey().indexOf("@",1)+1,entry.getKey().indexOf("@",2));
+          keys[2] = entry.getKey().substring(entry.getKey().indexOf("@",2)+1);
+          String srcPath = FileUtil.getFileUploadPath() + File.separator + "process" + File.separator + keys[0] + File.separator + keys[1] + File.separator + keys[2];
+          String targetPath = FileUtil.getFileUploadPath() + File.separator + "expire" + File.separator + keys[0] + File.separator + DateFormatUtils.format(System.currentTimeMillis(), "yyyyMMdd") + File.separator + keys[1];
+          try {
+            FileUtil.move(srcPath, targetPath);
+            log.info("上报对象编号:" + keys[0] + ",将过期的文件移到expire过期文件夹下:" + keys[2]);
+          } catch (Exception e) {
+            log.error("上报对象编号:" + keys[0] + ",将过期的文件移到expire过期文件夹下:" + keys[2] + "失败", e);
+          }
+          // 清理缓存
+          FileConstant.uploadCountMap.remove(entry.getKey());
+          FileConstant.againUploadFileMap.remove(entry.getKey());
+          FileConstant.readyUploadFileMap.remove(entry.getKey());
+          FileConstant.fileShouldMomentMap.remove(entry.getKey());
+          FileConstant.fileContentMap.remove(entry.getKey());
+        }
+      }
+    }
+    return fileterFileMap;
+  }
+
+  /**
+   * 获取文件CODE码
+   *
+   * @return
+   */
+  public Map<String, Integer> getFileCode() {
+    List<UploadFileCode> uploadFileCodeList = uploadFileCodeService.get();
+    Map<String, Integer> codeMap = new HashMap<>();
+    for (UploadFileCode uploadFileCode : uploadFileCodeList) {
+      codeMap.put(uploadFileCode.getFileTypeEnum().toString(), uploadFileCode.getFileTypeCode());
+    }
+    return codeMap;
+  }
+}

+ 12 - 0
ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/IEC102/NettyParent.java

@@ -0,0 +1,12 @@
+package com.jiayue.ipfcst.fileupload.IEC102;
+
+/**
+ * 启动类抽象父类
+ *
+ * @author xsl
+ * @version 3.0
+ */
+public abstract class NettyParent {
+  public abstract void start();
+  public abstract void destroy();
+}

+ 225 - 0
ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/IEC102/ServerFor102Decoder.java

@@ -0,0 +1,225 @@
+package com.jiayue.ipfcst.fileupload.IEC102;
+
+import ch.qos.logback.classic.Logger;
+import com.jiayue.ipfcst.common.core.util.SpringContextHolder;
+import com.jiayue.ipfcst.common.data.entity.UploadFileChannel;
+import com.jiayue.ipfcst.common.data.entity.UploadObject;
+import com.jiayue.ipfcst.fileupload.config.AppenderFactory;
+import com.jiayue.ipfcst.fileupload.service.UploadFileChannelService;
+import com.jiayue.ipfcst.fileupload.service.UploadObjectService;
+import com.jiayue.ipfcst.fileupload.util.ByteUtil;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.util.ReferenceCountUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import javax.xml.bind.DatatypeConverter;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * 102解码器
+ *
+ * @author xsl
+ * @version 3.0
+ */
+@Slf4j
+public class ServerFor102Decoder extends SimpleChannelInboundHandler {
+  //记录上次未读完的字节
+  ByteBuf tempMessage = Unpooled.buffer();
+
+  @Autowired
+  AppenderFactory appenderFactory = SpringContextHolder.getApplicationContext().getBean(AppenderFactory.class);
+  @Autowired
+  UploadObjectService uploadObjectService = SpringContextHolder.getApplicationContext().getBean(UploadObjectService.class);
+  @Autowired
+  UploadFileChannelService uploadFileChannelService = SpringContextHolder.getApplicationContext().getBean(UploadFileChannelService.class);
+
+  @Override
+  protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
+    InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
+    // 获取远程IP
+    String remoteIP = insocket.getAddress().getHostAddress();
+    // 根据远端IP获取上报对象报文日志
+    List<UploadFileChannel> uploadFileChannelList = uploadFileChannelService.get();
+    List<UploadFileChannel> uploadFileChannels = uploadFileChannelList.stream().filter(s -> s.getRemoteIp().equals(remoteIP)).collect(Collectors.toList());
+    if (uploadFileChannels.isEmpty()){
+      log.error("远端IP:"+remoteIP+"在通道中不存在,不接收对方内容!");
+      return;
+    }
+    UploadFileChannel uploadFileChannel = uploadFileChannels.get(0);
+    List<UploadObject> uploadObjectList = uploadObjectService.get();
+    UploadObject uploadObject = uploadObjectList.stream().filter(s -> s.getId().equals(uploadFileChannel.getUploadObjectId())).collect(Collectors.toList()).get(0);
+    Logger uploadLogger = appenderFactory.getLogger(uploadObject.getUploadObjectName(), uploadObject.getUploadProtocolEnum().getMessage(),uploadObject.getObjectNo());
+
+    ByteBuf msgCopy = (ByteBuf) msg;
+    int inSize = msgCopy.readableBytes();
+
+    if (inSize > 0) {
+      // tempMessage为缓存区,在此缓存解码报文
+      if (tempMessage.readableBytes() == 0) {
+        tempMessage = msgCopy.copy(0, inSize);
+      } else {
+        tempMessage.writeBytes(msgCopy);
+      }
+      if (log.isDebugEnabled()){
+        uploadLogger.debug("=========本次收到" + inSize + "字节========内容:" + ByteUtil.ByteBuf2String(msgCopy));
+        // 显示当前缓存里的内容
+        if (tempMessage.readableBytes() > 0) {
+          byte[] tempByte = new byte[tempMessage.readableBytes()];
+          tempMessage.readBytes(tempByte);
+          String tempByteInfo = DatatypeConverter.printHexBinary(tempByte);
+          uploadLogger.debug("当前缓存中字节数量:" + tempMessage.readableBytes() + "报文内容:" + tempByteInfo);
+        }
+      }
+
+      // 读下标重置
+      tempMessage.resetReaderIndex();
+      // 6个以上字节才报文解码
+      if (tempMessage.readableBytes() >= 6) {
+        // 此变量为了一串报文能继续往下解码
+        int i = 0;
+        while (true) {
+          String message = "";
+          // 检查10和68的下标位置
+          int s10 = tempMessage.indexOf(i, tempMessage.readableBytes(), (byte) 16);
+          int s68 = tempMessage.indexOf(i, tempMessage.readableBytes(), (byte) 104);
+          if (s10 == -1 && s68 == -1) {
+            // 10和68的都不存在,也就是都是-1,报文里没有关键字退出循环,等待接收下次内容
+            break;
+          } else if ((s10 > -1 && s68 > -1 && s10 < s68) || (s10 > -1 && s68 == -1)) {
+            // 调用10固定帧解码规则
+            message = handlerMessage10(s10, tempMessage,uploadLogger);
+            if ("".equals(message)) {
+              i = s10 + 1;
+            }
+          } else if ((s10 > -1 && s68 > -1 && s68 < s10) || (s68 > -1 && s10 == -1)) {
+            // 调用68可变帧解码规则
+            message = handlerMessage68(s68, tempMessage,uploadLogger);
+            if ("".equals(message)) {
+              i = s68 + 1;
+            }
+          }
+          if (!"".equals(message)) {
+            // 解码成功将下标重置0为了继续下一个解码循环
+            i = 0;
+            ctx.fireChannelRead(message);
+          }
+        }
+      }
+
+      // 缓存超过范围丢弃所有的内容
+      if (tempMessage.readableBytes() > 1024) {
+        byte[] clearByte = new byte[tempMessage.readableBytes()];
+        tempMessage.readBytes(clearByte);
+        String clearInfo = DatatypeConverter.printHexBinary(clearByte);
+        uploadLogger.info("无效报文超过范围数量舍弃:"+ clearInfo);
+        ReferenceCountUtil.release(tempMessage);
+        tempMessage = Unpooled.buffer();
+      }
+      if (uploadLogger.isDebugEnabled()){
+        // 显示解码后剩余的报文
+        if (tempMessage.readableBytes() > 0) {
+          byte[] leftOverByte = new byte[tempMessage.readableBytes()];
+          tempMessage.readBytes(leftOverByte);
+          String leftOverInfo = DatatypeConverter.printHexBinary(leftOverByte);
+          uploadLogger.debug("剩余的报文:"+ leftOverInfo);
+        }
+      }
+
+      tempMessage.resetReaderIndex();
+    }
+  }
+
+  /**
+   * 10固定帧报文处理
+   *
+   * @param s10       10所在缓存字节下标位置
+   * @param inMessage 缓存内容
+   * @return 符合固定帧规则的十六进制报文字符串
+   */
+  private String handlerMessage10(int s10, ByteBuf inMessage,Logger uploadLogger) {
+    int inMessageLength = inMessage.readableBytes();
+    String message = "";
+    // 调用10固定帧规则
+    if (s10 + 5 <= inMessageLength) {
+      byte e16 = inMessage.getByte(s10 + 5);
+      if (e16 == (byte) 22) {
+        // 找到了10开头并且16结尾的固定报文
+        ByteBuf gd10 = inMessage.copy(s10, 6);
+        byte[] kdst = new byte[gd10.readableBytes()];
+        gd10.readBytes(kdst);
+        message = DatatypeConverter.printHexBinary(kdst);
+        handlerLoseMessage(s10, inMessage,uploadLogger);
+        inMessage.readerIndex(s10 + 6);
+        // 删除报文到解码成功最后一位
+        inMessage.discardReadBytes();
+        ReferenceCountUtil.release(gd10);
+      }
+    }
+    return message;
+  }
+
+  /**
+   * 68可变帧报文处理
+   *
+   * @param s68       68所在缓存字节下标位置
+   * @param inMessage 缓存内容
+   * @return 符合固定帧规则的十六进制报文字符串
+   */
+  private String handlerMessage68(int s68, ByteBuf inMessage,Logger uploadLogger) {
+    int inMessageLength = inMessage.readableBytes();
+    String message = "";
+    // 判断第二个68
+    int second68 = inMessage.indexOf(s68 + 3, inMessageLength, (byte) 104);
+    if (second68 == s68 + 3) {
+      // 获取2个68之间的报文长度低高位
+      ByteBuf b = inMessage.copy(s68 + 1, 2);
+      byte[] dst = new byte[b.readableBytes()];
+      b.readBytes(dst);
+      // 将低高位反转
+      String messageLength = DatatypeConverter.printHexBinary(ByteUtil.bytesFlipping(dst));
+      // 算出第二个68后面数据段长度
+      int tempLength = ByteUtil.hexToDec(messageLength);
+      int end16 = second68 + tempLength + 2;
+      if (inMessageLength - 1 >= end16) {
+        // 满足长度,再判断是否以16结尾
+        if (inMessage.indexOf(end16, end16 + 1, (byte) 22) > -1) {
+          // 合法可变报文,将可变报文转成16进制给业务
+          ByteBuf kb = inMessage.copy(s68, 4 + tempLength + 2);
+          byte[] kdst = new byte[kb.readableBytes()];
+          kb.readBytes(kdst);
+          message = DatatypeConverter.printHexBinary(kdst);
+          handlerLoseMessage(s68, inMessage,uploadLogger);
+          inMessage.readerIndex(end16 + 1);
+          inMessage.discardReadBytes();
+          ReferenceCountUtil.release(kb);
+        }
+      }
+      ReferenceCountUtil.release(b);
+    }
+    return message;
+  }
+
+  /**
+   * 显示头之前没用的报文
+   *
+   * @param useHeadIndex
+   * @param inMessage
+   */
+  private void handlerLoseMessage(int useHeadIndex, ByteBuf inMessage,Logger uploadLogger) {
+    if (useHeadIndex > 0) {
+      // 舍弃的报文
+      ByteBuf loseMessage = inMessage.copy(0, useHeadIndex);
+      byte[] loseByte = new byte[loseMessage.readableBytes()];
+      loseMessage.readBytes(loseByte);
+      String loseInfo = DatatypeConverter.printHexBinary(loseByte);
+      uploadLogger.debug("舍弃的报文:"+ loseInfo);
+      ReferenceCountUtil.release(loseMessage);
+    }
+  }
+}

+ 21 - 0
ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/IEC102/ServerFor102Encoder.java

@@ -0,0 +1,21 @@
+package com.jiayue.ipfcst.fileupload.IEC102;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+
+import java.math.BigInteger;
+
+/**
+ * 102编码器
+ *
+ * @author xsl
+ * @version 3.0
+ */
+public class ServerFor102Encoder extends MessageToByteEncoder<String> {
+    @Override
+    protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
+      BigInteger bigint=new BigInteger(msg, 16);
+      out.writeBytes(bigint.toByteArray());
+    }
+}

+ 34 - 0
ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/IEC102/ServerFor102InitChannel.java

@@ -0,0 +1,34 @@
+package com.jiayue.ipfcst.fileupload.IEC102;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * netty服务端初始化
+ *
+ * @author xsl
+ * @version 3.0
+ */
+public class ServerFor102InitChannel extends ChannelInitializer<SocketChannel> {
+  private int readerIdleTime;
+
+  ServerFor102InitChannel(Integer readerIdleTime) {
+    this.readerIdleTime = readerIdleTime;
+  }
+
+  @Override
+  protected void initChannel(SocketChannel socketChannel) throws Exception {
+    socketChannel.pipeline().addLast(new IdleStateHandler(
+      readerIdleTime,
+      0,
+      0,
+      TimeUnit.SECONDS));
+    //添加编解码
+    socketChannel.pipeline().addLast("decoder", new ServerFor102Decoder());
+    socketChannel.pipeline().addLast("encoder", new ServerFor102Encoder());
+    socketChannel.pipeline().addLast(new ServerFor102TransitHandler(readerIdleTime));
+  }
+}

+ 422 - 0
ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/IEC102/ServerFor102NMService.java

@@ -0,0 +1,422 @@
+package com.jiayue.ipfcst.fileupload.IEC102;
+
+import cn.hutool.core.map.MapUtil;
+import cn.hutool.core.util.ArrayUtil;
+import cn.hutool.core.util.StrUtil;
+import com.jiayue.ipfcst.common.data.entity.ElectricField;
+import com.jiayue.ipfcst.common.data.entity.UploadFileChannel;
+import com.jiayue.ipfcst.common.data.entity.UploadFileLog;
+import com.jiayue.ipfcst.common.data.entity.UploadObject;
+import com.jiayue.ipfcst.fileupload.dto.Validate102Dto;
+import com.jiayue.ipfcst.fileupload.util.FileConstant;
+import com.jiayue.ipfcst.fileupload.util.FileMutableInteger;
+import com.jiayue.ipfcst.fileupload.util.FileUtil;
+import io.netty.channel.ChannelHandlerContext;
+import org.slf4j.Logger;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.io.*;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * 102内蒙服务端业务处理类
+ *
+ * @author xsl
+ * @version 3.0
+ */
+@Service
+public class ServerFor102NMService extends Base102Service {
+  @Transactional(propagation = Propagation.REQUIRED)
+  public String handlerMessage(String receiveMessage, UploadObject uploadObject, UploadFileChannel uploadFileChannel, ElectricField electricField, Logger log, ChannelHandlerContext ctx) {
+    String[] rmArray = IEC102Uitl.stingToArray(receiveMessage);
+    String sendMessage = "";
+    String sendCtrlChinese = "";
+    String receiveCtrlChinese = "";
+
+    if (FileConstant.validateMessage.get(uploadFileChannel.getId()+"")!=null){
+      // 需要校验接收的帧是否正确
+      Validate102Dto validate102Dto = FileConstant.validateMessage.get(uploadFileChannel.getId()+"");
+      if (!validate102Dto.getMessageFirst().equals(rmArray[0])){
+        log.info(ctx.channel().remoteAddress()+" - "+"调度应该传"+validate102Dto.getMessageFirst()+"帧并且原因是"+validate102Dto.getMessageReason()+",实际传"+IEC102Uitl.delimiterStringBySpace(receiveMessage)+",断开通道重启");
+        channelCloseConnect(ctx,uploadObject,uploadFileChannel);
+        return "";
+      }
+    }
+
+    if ("10".equals(rmArray[0])) {
+      // 获取控制域,根据功能码判断业务流
+      String fc = rmArray[1].substring(1);
+      if (FileConstant.validateMessage.get(uploadFileChannel.getId()+"")!=null){
+        // 需要校验接收的帧是否正确
+        Validate102Dto validate102Dto = FileConstant.validateMessage.get(uploadFileChannel.getId()+"");
+        if (!fc.equals(validate102Dto.getMessageReason())){
+          log.info(ctx.channel().remoteAddress()+" - "+"调度应该传10原因是"+validate102Dto.getMessageReason()+",实际传"+IEC102Uitl.delimiterStringBySpace(receiveMessage)+",断开通道重启");
+          channelCloseConnect(ctx,uploadObject,uploadFileChannel);
+          return "";
+        }
+      }
+
+      if ("0".equals(fc)) {
+        receiveCtrlChinese = "复位通信";
+        // 回复链路
+        sendMessage = makeMessage10("00");
+        sendCtrlChinese = "确认复位";
+      } else if ("9".equals(fc)) {
+        receiveCtrlChinese = "召唤链路";
+        // 回复链路
+        sendMessage = makeMessage10("0B");
+        sendCtrlChinese = "回应链路请求帧";
+      } else if ("A".equals(fc)) {
+        receiveCtrlChinese = "召唤一级数据";
+        // 先判断文件上报次数累计的缓存变量是否存在
+        Map<String, FileMutableInteger> filterMap = FileConstant.uploadCountMap.entrySet().stream().filter(r -> uploadObject.getObjectNo().equals(r.getKey().substring(0, r.getKey().indexOf("@"))))
+          .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue,
+            (oldValue, newValue) -> oldValue, LinkedHashMap::new));
+        // 上报文件名
+        String fileNameKey = "";
+        String typeSign = "";
+        if (MapUtil.isNotEmpty(filterMap)) {
+          sendCtrlChinese = "子站向主站传输文件内容";
+          // 不是第一次要文件了
+          for (Map.Entry<String, FileMutableInteger> entry : filterMap.entrySet()) {
+            fileNameKey = entry.getKey();
+            log.info(ctx.channel().remoteAddress()+" - "+"上传文件===>"+entry.getKey());
+            typeSign = "199";
+            break;
+          }
+        } else {
+          sendCtrlChinese = "子站向主站传输基本文件信息";
+          // 从待上报的缓存中取一个文件放入次数变量中
+          Map<String, UploadFileLog> readyUploadFileMap = super.filterFileByObjectNo(uploadObject.getObjectNo(), FileConstant.readyUploadFileMap, log);
+          for (Map.Entry<String, UploadFileLog> entry : readyUploadFileMap.entrySet()) {
+            // 从待上报缓存中取一个加入到统计变量中
+            FileMutableInteger fileMutableInteger = new FileMutableInteger(0);
+            FileConstant.uploadCountMap.put(entry.getKey(), fileMutableInteger);
+            log.info(ctx.channel().remoteAddress()+" - "+"找到文件===>"+entry.getKey());
+            fileNameKey = entry.getKey();
+            typeSign = "198";
+            break;
+          }
+          Validate102Dto dto = new Validate102Dto();
+          dto.setMessageFirst("68");
+          dto.setMessageReason("07,0D");
+          FileConstant.validateMessage.put(uploadFileChannel.getId()+"",dto);
+        }
+        if ("".equals(fileNameKey)) {
+          sendCtrlChinese = "不正常的回复";
+          sendMessage = "1000FFFFFE16";
+          log.info(ctx.channel().remoteAddress()+" - "+"主站不正常的调用,导致取不到文件无法上报");
+        } else {
+          // 生成传输文件的68帧
+          sendMessage = uploadFile68(typeSign, fileNameKey, electricField, uploadObject, uploadFileChannel, log);
+        }
+      } else if ("B".equals(fc)) {
+        receiveCtrlChinese = "召唤二级数据";
+        // 获取此通道下的上报文件
+        Map<String, UploadFileLog> readyUploadFileMap = super.filterFileByObjectNo(uploadObject.getObjectNo(), FileConstant.readyUploadFileMap, log);
+
+        if (MapUtil.isNotEmpty(readyUploadFileMap)) {
+          // 文件缓存中有上报文件
+          sendMessage = makeMessage10("29");
+          sendCtrlChinese = "希望向主站传输一级数据";
+          Validate102Dto dto = new Validate102Dto();
+          dto.setMessageFirst("10");
+          dto.setMessageReason("A");
+          FileConstant.validateMessage.put(uploadFileChannel.getId()+"",dto);
+        } else {
+          // 没有文件
+          sendMessage = makeMessage10("09");
+          sendCtrlChinese = "没有文件需要上报";
+          Validate102Dto dto = new Validate102Dto();
+          dto.setMessageFirst("10");
+          dto.setMessageReason("B");
+          FileConstant.validateMessage.put(uploadFileChannel.getId()+"",dto);
+        }
+      }
+    } else {
+      // 68开头报文判断传输原因
+      String cot = rmArray[9];
+      if ("07".equals(cot)) {
+        receiveCtrlChinese = "主站允许本次文件请求";
+        sendMessage = "1020FFFF1E16";
+        sendCtrlChinese = "确认有一级数据";
+        Validate102Dto dto = new Validate102Dto();
+        dto.setMessageFirst("10");
+        dto.setMessageReason("A");
+        FileConstant.validateMessage.put(uploadFileChannel.getId()+"",dto);
+      } else {
+        if ("0A".equals(cot)) {
+          // 上报成功
+          receiveCtrlChinese = "文件成功接收结束";
+          super.totalFileUploadNums(uploadObject, uploadFileChannel, "", log);
+        } else {
+          // 上报失败
+          receiveCtrlChinese = "error";
+          if ("0D".equals(cot)) {
+            receiveCtrlChinese = "文件重复传输";
+          } else if ("0B".equals(cot)) {
+            receiveCtrlChinese = "文件名错误";
+          } else if ("0C".equals(cot)) {
+            receiveCtrlChinese = "文件过大(大于1MB)";
+          } else if ("0E".equals(cot)) {
+            receiveCtrlChinese = "文件类型错误";
+          } else if ("10".equals(cot)) {
+            receiveCtrlChinese = "文件接收错误(长度、MD5校验等),发送方准备重新传输该文件";
+          }
+          super.totalFileUploadNums(uploadObject, uploadFileChannel, receiveCtrlChinese, log);
+        }
+        sendCtrlChinese = "对上一条命令确认";
+        sendMessage = "1000FFFFFE16";
+        Validate102Dto dto = new Validate102Dto();
+        dto.setMessageFirst("10");
+        dto.setMessageReason("B");
+        FileConstant.validateMessage.put(uploadFileChannel.getId()+"",dto);
+      }
+    }
+    log.info(ctx.channel().remoteAddress()+" - "+"接收报文:[" + receiveCtrlChinese + "]" + IEC102Uitl.delimiterStringBySpace(receiveMessage));
+    log.info(ctx.channel().remoteAddress()+" - "+"发送报文:[" + sendCtrlChinese + "]" + IEC102Uitl.delimiterStringBySpace(sendMessage));
+    return sendMessage;
+  }
+
+  /**
+   * 生成10帧报文
+   * 格式说明:10|控制域|地址低字节|地址高字节|校验和|16
+   *
+   * @param ctrl 控制域字节
+   * @return 10帧报文
+   */
+  private String makeMessage10(String ctrl) {
+    String[] returnMessage = {"10", "ctrl", "FF", "FF", "crc", "16"};
+    String crc = IEC102Uitl.makeChecksum(ctrl + returnMessage[2] + returnMessage[3]);
+    returnMessage[1] = ctrl;
+    returnMessage[4] = crc;
+    String sendMessage = ArrayUtil.join(returnMessage, "");
+    return sendMessage.toUpperCase();
+  }
+
+  /**
+   * 传输文件的68帧报文
+   * crc校验和:从[控制域]到[校验和]之前
+   * 帧长:从[控制域]到[校验和]之前所有字节数
+   *
+   * @return 68帧报文
+   */
+  private String uploadFile68(String typeSign, String fileNameKey, ElectricField electricField, UploadObject uploadObject, UploadFileChannel uploadFileChannel, Logger log) {
+    String[] tempKey = new String[3];
+    tempKey[0] = fileNameKey.substring(0,fileNameKey.indexOf("@",0));
+    tempKey[1] = fileNameKey.substring(fileNameKey.indexOf("@",1)+1,fileNameKey.indexOf("@",2));
+    tempKey[2] = fileNameKey.substring(fileNameKey.indexOf("@",2)+1);
+    String charsetName = uploadObject.getUploadFileCharSetEnum().getMessage();
+    String sendMessage = "";
+    // 获取本地物理文件路径
+    String filePath = FileUtil.getFileUploadPath() + File.separator + "process" + File.separator + tempKey[0] + File.separator + tempKey[1] + File.separator + tempKey[2];
+
+    if ("198".equals(typeSign)) {
+      // 子站向主站传输文件基本信息(68|帧长低字节|帧长高字节|68|控制域|地址低字节|地址高字节|类型标识|可变限定词|传输原因|设备地址低字节|设备地址高字节|记录地址|文件名字节|文件类型|文件长度|文件内容MD5数字签名|校验和|16)
+      String[] returnMessage = {"68", "mlLow", "mlHeight", "68", "28", "FF", "FF", "C6", "01", "06", "FF", "FF", "00", "fileNameByte", "fileType", "fileContentAllLength", "fileContentMD5", "crc", "16"};
+      // 生成文件名
+      String fileName = tempKey[2];
+      byte[] tempFileNameBytes = null;
+      try {
+        tempFileNameBytes = fileName.getBytes(charsetName);
+      } catch (Exception e) {
+        log.error("转换文件名编码失败", e);
+      }
+      // 获取文件名称长度
+      String fileNameLength = uploadFileChannel.getUploadFileNameLengthEnum().getMessage();
+      String fileNameByte = ByteUtil.Byte2String(tempFileNameBytes);
+      returnMessage[13] = StrUtil.padAfter(fileNameByte.replace(" ", ""), Integer.parseInt(fileNameLength) * 2, '0');
+      // 生成类型标识
+      Map<String, Integer> codeMap = super.getFileCode();
+      returnMessage[14] = Integer.toHexString(codeMap.get(tempKey[1]));
+      String fileContentByte = "";
+      String tempContent = "";
+      try {
+        fileContentByte = nmCreateFileMessageHex(filePath, log, uploadObject.getId(), fileNameKey);
+        tempContent = new String(fileContentByte);
+        fileContentByte = ByteUtil.Byte2String(fileContentByte.getBytes(charsetName));
+      } catch (FileNotFoundException e) {
+        log.error("没找到文件", e);
+        totalFileUploadNums(uploadObject, uploadFileChannel, e.toString(), log);
+      } catch (Exception e) {
+        log.error("读取本地文件失败", e);
+        totalFileUploadNums(uploadObject, uploadFileChannel, e.toString(), log);
+      }
+      fileContentByte = fileContentByte.replaceAll(" ", "");
+      returnMessage[15] = ByteUtil.bytesToHexStringFromHighToLow(ByteUtil.intToBytesFromLowToHigh(fileContentByte.length() / 2));
+      try {
+        returnMessage[16] = FileUtil.getMD5(tempContent.getBytes(charsetName));
+      } catch (Exception e) {
+        log.error("文件内容加密MD5失败", e);
+      }
+      // 计算校验和,获取从[控制域]到[校验和]之前的报文
+      StringBuffer tempSB = new StringBuffer("");
+      for (int i = 4; i <= 16; i++) {
+        tempSB.append(returnMessage[i]);
+      }
+      returnMessage[17] = IEC102Uitl.makeChecksum(tempSB.toString());
+      // 计算报文的帧长
+      String ml = StrUtil.padAfter(ByteUtil.decToHex(tempSB.length() / 2), 4, '0');
+      String mlLow = ml.substring(0, 2);
+      String mlHeight = ml.substring(2);
+      returnMessage[1] = mlLow;
+      returnMessage[2] = mlHeight;
+      sendMessage = ArrayUtil.join(returnMessage, "");
+    } else {
+      // 子站向主站传输文件内容(68|帧长低字节|帧长高字节|68|控制域|地址低字节|地址高字节|类型标识|可变限定词|传输原因|设备地址低字节|设备地址高字节|记录地址|文件内容在文件的起始地址|本帧文件数据长度|文件数据|校验和|16)
+      String[] returnMessage = {"68", "mlLow", "mlHeight", "68", "ctrl", "FF", "FF", "C7", "01", "reason", "FF", "FF", "00", "fileContentStartIndex", "fileContentLength", "fileContentByte", "crc", "16"};
+      String fileContentStartIndex = "";
+      // 获取缓存中文件内容
+      Map<String, String> fileContentMap = new HashMap<>();
+
+      String fileContentByte = "";
+      if (FileConstant.fileContentByIdxMap.get(fileNameKey) == null) {
+        // 缓存中没有文件内容则新生成文件内容报文
+        try {
+          fileContentByte = nmCreateFileMessageHex(filePath, log, uploadObject.getId(), fileNameKey);
+          fileContentByte = ByteUtil.Byte2String(fileContentByte.getBytes(charsetName));
+        } catch (FileNotFoundException e) {
+          log.error("没找到文件", e);
+          totalFileUploadNums(uploadObject, uploadFileChannel, e.toString(), log);
+        } catch (Exception e) {
+          log.error("读取本地文件失败", e);
+          totalFileUploadNums(uploadObject, uploadFileChannel, e.toString(), log);
+        }
+        fileContentByte = fileContentByte.replaceAll(" ", "");
+        fileContentMap.put("contentByteIdx", "0");
+      } else {
+        fileContentMap = FileConstant.fileContentByIdxMap.get(fileNameKey);
+        fileContentByte = fileContentMap.get("fileContent");
+      }
+      // 获取通道单次文件传输字节
+      String singleByte = uploadFileChannel.getUploadFileSingleByteEnum().getMessage();
+      String ctrl = "";
+      String reason = "";
+      if (fileContentByte.length() <= Integer.parseInt(singleByte) * 2) {
+        // 一次可以传输完,生成控制域码
+        ctrl = "08";
+        reason = "08";
+
+        Integer idx = Integer.parseInt(fileContentMap.get("contentByteIdx"));
+        if (idx != 0) {
+          idx = idx + 1;
+        }
+        fileContentStartIndex = ByteUtil.bytesToHexStringFromHighToLow(ByteUtil.intToBytesFromLowToHigh(idx));
+        FileConstant.fileContentByIdxMap.remove(fileNameKey);
+        Validate102Dto dto = new Validate102Dto();
+        dto.setMessageFirst("68");
+        dto.setMessageReason("0A");
+        FileConstant.validateMessage.put(uploadFileChannel.getId()+"",dto);
+      } else {
+        // 生成不是最后一帧控制域码
+        ctrl = "28";
+        reason = "09";
+        // 将剩余的文件内容报文存入缓存中
+        Map<String, String> tempContentMap = new HashMap<>();
+        tempContentMap.put("fileContent", fileContentByte.substring(Integer.parseInt(singleByte) * 2));
+        // 上次字节结束的位置
+        Integer lastIdx = Integer.parseInt(fileContentMap.get("contentByteIdx"));
+        if (lastIdx == 0) {
+          tempContentMap.put("contentByteIdx", String.valueOf(Integer.parseInt(singleByte) - 1));
+          fileContentStartIndex = ByteUtil.bytesToHexStringFromHighToLow(ByteUtil.intToBytesFromLowToHigh(0));
+        } else {
+          tempContentMap.put("contentByteIdx", String.valueOf(lastIdx + Integer.parseInt(singleByte)));
+          fileContentStartIndex = ByteUtil.bytesToHexStringFromHighToLow(ByteUtil.intToBytesFromLowToHigh(lastIdx + 1));
+        }
+        FileConstant.fileContentByIdxMap.put(fileNameKey, tempContentMap);
+        fileContentByte = fileContentByte.substring(0, Integer.parseInt(singleByte) * 2);
+
+        Validate102Dto dto = new Validate102Dto();
+        dto.setMessageFirst("10");
+        dto.setMessageReason("A");
+        FileConstant.validateMessage.put(uploadFileChannel.getId()+"",dto);
+      }
+
+      returnMessage[4] = ctrl;
+      returnMessage[9] = reason;
+      returnMessage[13] = fileContentStartIndex;
+      returnMessage[14] = ByteUtil.decToHex(fileContentByte.length() / 2);
+      returnMessage[15] = fileContentByte;
+
+      // 计算校验和,获取从[控制域]到[校验和]之前的报文
+      StringBuffer tempSB = new StringBuffer("");
+      for (int i = 4; i <= 15; i++) {
+        tempSB.append(returnMessage[i]);
+      }
+      returnMessage[16] = IEC102Uitl.makeChecksum(tempSB.toString());
+      // 计算报文的帧长
+      String ml = StrUtil.padAfter(ByteUtil.decToHex(tempSB.length() / 2), 4, '0');
+      String mlLow = ml.substring(0, 2);
+      String mlHeight = ml.substring(2);
+      returnMessage[1] = mlLow;
+      returnMessage[2] = mlHeight;
+      sendMessage = ArrayUtil.join(returnMessage, "");
+    }
+    return sendMessage.toUpperCase();
+  }
+
+
+  /**
+   * 读取文件内容转16进制报文
+   *
+   * @param filePath
+   * @param log
+   * @return
+   */
+  private String nmCreateFileMessageHex(String filePath, Logger log, Integer uploadObjectId, String fileNameKey) throws Exception {
+    InputStreamReader isr = null;
+    String fileMessageHex = "";
+    try {
+      isr = new InputStreamReader(new FileInputStream(filePath), "UTF-8");
+      StringBuffer sb = new StringBuffer("");
+      int len1 = 0;
+      while ((len1 = isr.read()) != -1) {
+        sb.append((char) len1);
+      }
+      isr.close();
+      fileMessageHex = sb.toString();
+    } catch (Exception e) {
+      log.error("没找到文件", e);
+      throw e;
+    } finally {
+      if (isr != null) {
+        try {
+          isr.close();
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+    return fileMessageHex;
+  }
+
+  public void channelCloseConnect(ChannelHandlerContext ctx, UploadObject uploadObject, UploadFileChannel uploadFileChannel){
+    // 传输帧类型不对,断开通道重启
+    Iterator<Map.Entry<String, FileMutableInteger>> countMap = FileConstant.uploadCountMap.entrySet().iterator();
+    while (countMap.hasNext()){
+      Map.Entry<String, FileMutableInteger> entry = countMap.next();
+      String key = entry.getKey();
+      if (key.contains(uploadObject.getObjectNo() + "@")){
+        countMap.remove();
+      }
+    }
+
+    Iterator<Map.Entry<String, String>> contentMap = FileConstant.fileContentMap.entrySet().iterator();
+    while (contentMap.hasNext()){
+      Map.Entry<String,String> entry = contentMap.next();
+      String key = entry.getKey();
+      if (key.contains(uploadObject.getObjectNo() + "@")){
+        contentMap.remove();
+      }
+    }
+    FileConstant.validateMessage.remove(uploadFileChannel.getId()+"");
+    ctx.close();
+  }
+}
+

+ 347 - 0
ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/IEC102/ServerFor102StandardService.java

@@ -0,0 +1,347 @@
+package com.jiayue.ipfcst.fileupload.IEC102;
+
+import cn.hutool.core.map.MapUtil;
+import cn.hutool.core.util.ArrayUtil;
+import cn.hutool.core.util.StrUtil;
+import com.jiayue.ipfcst.common.data.entity.ElectricField;
+import com.jiayue.ipfcst.common.data.entity.UploadFileChannel;
+import com.jiayue.ipfcst.common.data.entity.UploadFileLog;
+import com.jiayue.ipfcst.common.data.entity.UploadObject;
+import com.jiayue.ipfcst.fileupload.dto.Validate102Dto;
+import com.jiayue.ipfcst.fileupload.util.FileConstant;
+import com.jiayue.ipfcst.fileupload.util.FileMutableInteger;
+import com.jiayue.ipfcst.fileupload.util.FileUtil;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * 102服务端业务处理类
+ *
+ * @author xsl
+ * @version 3.0
+ */
+@Service
+public class ServerFor102StandardService extends Base102Service {
+  @Transactional(propagation = Propagation.REQUIRED)
+  public String handlerMessage(String receiveMessage, UploadObject uploadObject, UploadFileChannel uploadFileChannel, ElectricField electricField, Logger log, ChannelHandlerContext ctx) {
+    String[] rmArray = IEC102Uitl.stingToArray(receiveMessage);
+    String sendMessage = "";
+    String sendCtrlChinese = "";
+    String receiveCtrlChinese = "";
+
+    if (FileConstant.validateMessage.get(uploadFileChannel.getId()+"")!=null){
+      // 需要校验接收的帧是否正确
+      Validate102Dto validate102Dto = FileConstant.validateMessage.get(uploadFileChannel.getId()+"");
+      if (!validate102Dto.getMessageFirst().equals(rmArray[0])){
+        log.info(ctx.channel().remoteAddress()+" - "+"调度应该传"+validate102Dto.getMessageFirst()+"帧并且原因是"+validate102Dto.getMessageReason()+",实际传"+IEC102Uitl.delimiterStringBySpace(receiveMessage)+",断开通道重启");
+        super.totalFileUploadNums(uploadObject,uploadFileChannel, "调度应该传"+validate102Dto.getMessageFirst()+"帧,实际传"+IEC102Uitl.delimiterStringBySpace(receiveMessage), log);
+        channelCloseConnect(ctx,uploadObject,uploadFileChannel);
+        return "";
+      }
+    }
+
+    if ("10".equals(rmArray[0])) {
+      // 获取控制域,根据功能码判断业务流
+      String fc = rmArray[1].substring(1);
+
+      if (FileConstant.validateMessage.get(uploadFileChannel.getId()+"")!=null){
+        // 需要校验接收的帧是否正确
+        Validate102Dto validate102Dto = FileConstant.validateMessage.get(uploadFileChannel.getId()+"");
+        if (!"".equals(validate102Dto.getMessageReason())){
+          if (!fc.equals(validate102Dto.getMessageReason())){
+            log.info(ctx.channel().remoteAddress()+" - "+"调度应该传10原因是"+validate102Dto.getMessageReason()+",实际传"+IEC102Uitl.delimiterStringBySpace(receiveMessage)+",断开通道重启");
+            channelCloseConnect(ctx,uploadObject,uploadFileChannel);
+            return "";
+          }
+        }
+      }
+
+      if ("0".equals(fc)) {
+        receiveCtrlChinese = "复位通信";
+        // 回复链路
+        sendMessage = makeMessage10("00");
+        sendCtrlChinese = "确认复位";
+      } else if ("3".equals(fc)) {
+        receiveCtrlChinese = "传送数据";
+      } else if ("9".equals(fc)) {
+        receiveCtrlChinese = "召唤链路";
+        // 回复链路
+        sendMessage = makeMessage10("0B");
+        sendCtrlChinese = "回应链路请求帧";
+      } else if ("A".equals(fc)) {
+        receiveCtrlChinese = "召唤一级数据";
+        // 先判断文件上报次数累计的缓存变量是否存在
+        Map<String, FileMutableInteger> filterMap = FileConstant.uploadCountMap.entrySet().stream().filter(r -> Integer.parseInt(uploadObject.getObjectNo())== Integer.parseInt(r.getKey().substring(0, r.getKey().indexOf("@"))))
+          .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue,
+            (oldValue, newValue) -> oldValue, LinkedHashMap::new));
+        // 上报文件名
+        String fileNameKey = "";
+        if (MapUtil.isNotEmpty(filterMap)) {
+          // 不是第一次要文件了
+          for (Map.Entry<String, FileMutableInteger> entry : filterMap.entrySet()) {
+            fileNameKey = entry.getKey();
+            log.info(ctx.channel().remoteAddress()+" - "+"上传文件===>"+entry.getKey());
+            break;
+          }
+        } else {
+          // 从待上报的缓存中取一个文件放入次数变量中
+          Map<String, UploadFileLog> readyUploadFileMap = super.filterFileByObjectNo(uploadObject.getObjectNo(), FileConstant.readyUploadFileMap,log);
+          for (Map.Entry<String, UploadFileLog> entry : readyUploadFileMap.entrySet()) {
+            // 从待上报缓存中取一个加入到累计次数变量中
+            FileMutableInteger fileMutableInteger = new FileMutableInteger(0);
+            FileConstant.uploadCountMap.put(entry.getKey(), fileMutableInteger);
+            log.info(ctx.channel().remoteAddress()+" - "+"找到文件===>"+entry.getKey());
+            fileNameKey = entry.getKey();
+            break;
+          }
+        }
+        if ("".equals(fileNameKey)) {
+          sendCtrlChinese = "没有找到上报文件";
+          sendMessage = makeMessage10("09");
+          FileConstant.validateMessage.remove(uploadFileChannel.getId()+"");
+        } else {
+          // 生成传输文件的68帧
+          sendMessage = uploadFile68(fileNameKey, electricField, uploadObject, uploadFileChannel, log);
+          sendCtrlChinese = "通道:"+uploadFileChannel.getChannelName()+"上报:"+fileNameKey;
+        }
+      } else if ("B".equals(fc)) {
+        receiveCtrlChinese = "召唤二级数据";
+        // 获取此通道下的上报文件
+        Map<String, UploadFileLog> readyUploadFileMap = super.filterFileByObjectNo(uploadObject.getObjectNo(), FileConstant.readyUploadFileMap,log);
+
+        if (MapUtil.isNotEmpty(readyUploadFileMap)) {
+          // 文件缓存中有上报文件
+          sendMessage = makeMessage10("29");
+          sendCtrlChinese = "希望向主站传输一级数据";
+          Validate102Dto dto = new Validate102Dto();
+          dto.setMessageFirst("10");
+          dto.setMessageReason("A");
+          FileConstant.validateMessage.put(uploadFileChannel.getId()+"",dto);
+        } else {
+          // 没有文件
+          sendMessage = makeMessage10("09");
+          sendCtrlChinese = "没有文件需要上报";
+          Validate102Dto dto = new Validate102Dto();
+          dto.setMessageFirst("10");
+          dto.setMessageReason("B");
+          FileConstant.validateMessage.put(uploadFileChannel.getId()+"",dto);
+        }
+      } else if ("C".equals(fc)) {
+        receiveCtrlChinese = "下发数据通知";
+      }
+      log.info(ctx.channel().remoteAddress()+" - "+"接收报文:[" + receiveCtrlChinese + "]" + IEC102Uitl.delimiterStringBySpace(receiveMessage));
+    } else {// 68开头报文
+      // 判断传输原因
+      String cot = rmArray[9];
+      String[] send68Array = ArrayUtil.clone(rmArray);
+      if ("0A".equals(cot)) {
+        receiveCtrlChinese = "文件接收结束";
+        send68Array[9] = "0B";
+        // 计算校验和
+        String tempStr = StringUtils.join(send68Array, "");
+        String crc = IEC102Uitl.makeChecksum(tempStr.substring(8, 34));
+        send68Array[17] = crc;
+        sendCtrlChinese = "确认文件传输结束";
+        log.info(ctx.channel().remoteAddress()+" - "+"接收报文:[" + receiveCtrlChinese + "]" + IEC102Uitl.delimiterStringBySpace(receiveMessage));
+        // 文件状态成功
+        super.totalFileUploadNums(uploadObject,uploadFileChannel, "", log);
+        sendMessage = ArrayUtil.join(send68Array, "");
+      } else {
+        // 文件状态失败
+        if ("0D".equals(cot)) {
+          receiveCtrlChinese = "文件重复传输";
+          send68Array[9] = "0E";
+          // 计算校验和
+          String tempStr = StringUtils.join(send68Array, "");
+          String crc = IEC102Uitl.makeChecksum(tempStr.substring(8, 26));
+          send68Array[13] = crc;
+          sendCtrlChinese = "确认文件重复传输";
+        } else if ("0F".equals(cot)) {
+          receiveCtrlChinese = "文件长度超出规定最大长度";
+          send68Array[9] = "10";
+          // 计算校验和
+          String tempStr = StringUtils.join(send68Array, "");
+          String crc = IEC102Uitl.makeChecksum(tempStr.substring(8, 26));
+          send68Array[13] = crc;
+          sendCtrlChinese = "确认传输文件过长";
+        } else if ("11".equals(cot)) {
+          receiveCtrlChinese = "文件名格式不符合要求";
+          send68Array[9] = "12";
+          // 计算校验和
+          String tempStr = StringUtils.join(send68Array, "");
+          String crc = IEC102Uitl.makeChecksum(tempStr.substring(8, 26));
+          send68Array[13] = crc;
+          sendCtrlChinese = "确认文件名格式不符合要求";
+        } else if ("13".equals(cot)) {
+          receiveCtrlChinese = "传输单帧报文长度过长";
+          send68Array[9] = "14";
+          // 计算校验和
+          String tempStr = StringUtils.join(send68Array, "");
+          String crc = IEC102Uitl.makeChecksum(tempStr.substring(8, 26));
+          send68Array[13] = crc;
+          sendCtrlChinese = "确认单帧报文长度过长";
+        } else {
+          receiveCtrlChinese = "接收的内容程序没做判断";
+          sendCtrlChinese = "不是正常的报文回复内容";
+        }
+        log.info(ctx.channel().remoteAddress()+" - "+"接收报文:[" + receiveCtrlChinese + "]" + IEC102Uitl.delimiterStringBySpace(receiveMessage));
+        super.totalFileUploadNums(uploadObject,uploadFileChannel, sendCtrlChinese, log);
+        sendMessage = ArrayUtil.join(send68Array, "");
+      }
+      Validate102Dto dto = new Validate102Dto();
+      dto.setMessageFirst("10");
+      dto.setMessageReason("");
+      FileConstant.validateMessage.put(uploadFileChannel.getId()+"",dto);
+    }
+    log.info(ctx.channel().remoteAddress()+" - "+"发送报文:[" + sendCtrlChinese + "]" + IEC102Uitl.delimiterStringBySpace(sendMessage));
+    return sendMessage;
+  }
+
+  /**
+   * 生成10帧报文
+   * 格式说明:10|控制域|地址低字节|地址高字节|校验和|16
+   *
+   * @param ctrl 控制域字节
+   * @return 10帧报文
+   */
+  private String makeMessage10(String ctrl) {
+    String[] returnMessage = {"10", "ctrl", "FF", "FF", "crc", "16"};
+    String crc = IEC102Uitl.makeChecksum(ctrl + returnMessage[2] + returnMessage[3]);
+    returnMessage[1] = ctrl;
+    returnMessage[4] = crc;
+    String sendMessage = ArrayUtil.join(returnMessage, "");
+    return sendMessage.toUpperCase();
+  }
+
+  /**
+   * 传输文件的68帧报文
+   * 格式说明:68|帧长低字节|帧长高字节|68|控制域|地址低字节|地址高字节|类型标识|可变限定词|传输原因|设备地址低字节|设备地址高字节|记录地址|文件名字节|文件内容字节|校验和|16
+   * crc校验和:从[控制域]到[校验和]之前
+   * 帧长:从[控制域]到[校验和]之前所有字节数
+   *
+   * @return 68帧报文
+   */
+  private String uploadFile68(String fileNameKey, ElectricField electricField, UploadObject uploadObject, UploadFileChannel uploadFileChannel, Logger log) {
+    String[] returnMessage = {"68", "mlLow", "mlHeight", "68", "ctrl", "FF", "FF", "typeCode", "01", "reason", "FF", "FF", "00", "fileNameByte", "fileContentByte", "crc", "16"};
+    String[] tempKey = new String[3];
+    String tempFileName = fileNameKey;
+    tempKey[0] = tempFileName.substring(0,tempFileName.indexOf("@",0));
+    tempFileName = tempFileName.substring(tempFileName.indexOf("@",0)+1);
+    tempKey[1] = tempFileName.substring(0,tempFileName.indexOf("@",0));
+    tempKey[2] = tempFileName.substring(tempFileName.indexOf("@",0)+1);
+
+    // 生成文件名及内容
+    String fileName = tempKey[2];
+    byte[] tempFileNameBytes = null;
+    String charsetName = "";
+    if ("E42".equals(electricField.getProvinceEnum().toString()) && ("E9".equals(tempKey[1]) || "E74".equals(tempKey[1])|| "E92".equals(tempKey[1]))){
+      // 湖北省新风机文件用GBK上报,其他用UTF-8
+      charsetName = "GBK";
+    }
+    else{
+      charsetName = uploadObject.getUploadFileCharSetEnum().getMessage();
+    }
+    try {
+      tempFileNameBytes = fileName.getBytes(charsetName);
+    } catch (Exception e) {
+      log.error("转换文件名编码失败", e);
+    }
+    // 获取文件名称长度
+    String fileNameLength = uploadFileChannel.getUploadFileNameLengthEnum().getMessage();
+    String fileNameByte = ByteUtil.Byte2String(tempFileNameBytes);
+    fileNameByte = StrUtil.padAfter(fileNameByte.replace(" ", ""), Integer.parseInt(fileNameLength) * 2, '0');
+    // 获取本地物理文件路径
+    String filePath = FileUtil.getFileUploadPath() + File.separator + "process" + File.separator + tempKey[0] + File.separator + tempKey[1] + File.separator + tempKey[2];
+
+    String fileContentByte = FileConstant.fileContentMap.get(fileNameKey);
+    if (StrUtil.hasBlank(fileContentByte)) {
+      // 缓存中没有文件内容则新生成文件内容报文
+      fileContentByte = super.createFileMessageHex(filePath, charsetName, log);
+    }
+    // 生成不是最后一帧控制域码
+    String ctrl = "28";
+    String reason = "08";
+    // 获取通道单次文件传输字节
+    String singleByte = uploadFileChannel.getUploadFileSingleByteEnum().getMessage();
+    if (fileContentByte.length() <= Integer.parseInt(singleByte) * 2) {
+      // 一次可以传输完,生成控制域码
+      ctrl = "08";
+      reason = "07";
+
+      Validate102Dto dto = new Validate102Dto();
+      dto.setMessageFirst("68");
+      dto.setMessageReason("0A");
+      FileConstant.validateMessage.put(uploadFileChannel.getId()+"",dto);
+
+    } else {
+      // 将剩余的文件内容报文存入缓存中
+      FileConstant.fileContentMap.put(fileNameKey, fileContentByte.substring(Integer.parseInt(singleByte) * 2));
+      fileContentByte = fileContentByte.substring(0, Integer.parseInt(singleByte) * 2);
+
+//      Validate102Dto dto = new Validate102Dto();
+//      dto.setMessageFirst("10");
+//      dto.setMessageReason("A");
+      FileConstant.validateMessage.remove(uploadFileChannel.getId()+"");
+    }
+    // 生成类型标识
+//    List<UploadFileCode> uploadFileCodeList = uploadFileCodeService.get();
+    Map<String,Integer> codeMap = super.getFileCode();
+    String typeCode = Integer.toHexString(codeMap.get(tempKey[1]));
+
+    returnMessage[4] = ctrl;
+    returnMessage[7] = typeCode;
+    returnMessage[9] = reason;
+    returnMessage[13] = fileNameByte;
+    returnMessage[14] = fileContentByte;
+
+    // 获取从[控制域]到[校验和]之前的报文
+    StringBuffer tempSB = new StringBuffer("");
+    for (int i = 4; i <= 14; i++) {
+      tempSB.append(returnMessage[i]);
+    }
+    // 计算校验和
+    String crc = IEC102Uitl.makeChecksum(tempSB.toString());
+    returnMessage[15] = crc;
+    // 计算报文的帧长
+    String ml = StrUtil.padAfter(ByteUtil.decToHex(tempSB.length() / 2), 4, '0');
+    String mlLow = ml.substring(0, 2);
+    String mlHeight = ml.substring(2);
+    returnMessage[1] = mlLow;
+    returnMessage[2] = mlHeight;
+    String sendMessage = ArrayUtil.join(returnMessage, "");
+    return sendMessage.toUpperCase();
+  }
+
+  public void channelCloseConnect(ChannelHandlerContext ctx, UploadObject uploadObject, UploadFileChannel uploadFileChannel){
+    // 传输帧类型不对,断开通道重启
+    Iterator<Map.Entry<String, FileMutableInteger>> countMap = FileConstant.uploadCountMap.entrySet().iterator();
+    while (countMap.hasNext()){
+      Map.Entry<String, FileMutableInteger> entry = countMap.next();
+      String key = entry.getKey();
+      if (key.contains(uploadObject.getObjectNo() + "@")){
+        countMap.remove();
+      }
+    }
+
+    Iterator<Map.Entry<String, String>> contentMap = FileConstant.fileContentMap.entrySet().iterator();
+    while (contentMap.hasNext()){
+      Map.Entry<String,String> entry = contentMap.next();
+      String key = entry.getKey();
+      if (key.contains(uploadObject.getObjectNo() + "@")){
+        contentMap.remove();
+      }
+    }
+    FileConstant.validateMessage.remove(uploadFileChannel.getId()+"");
+    ctx.close();
+  }
+}

+ 276 - 0
ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/IEC102/ServerFor102TransitHandler.java

@@ -0,0 +1,276 @@
+package com.jiayue.ipfcst.fileupload.IEC102;
+
+import ch.qos.logback.classic.Logger;
+import com.jiayue.ipfcst.common.core.util.SpringContextHolder;
+import com.jiayue.ipfcst.common.data.entity.ElectricField;
+import com.jiayue.ipfcst.common.data.entity.UploadFileChannel;
+import com.jiayue.ipfcst.common.data.entity.UploadObject;
+import com.jiayue.ipfcst.console.service.ChannelDisconLogService;
+import com.jiayue.ipfcst.console.service.ElectricFieldService;
+import com.jiayue.ipfcst.fileupload.config.AppenderFactory;
+import com.jiayue.ipfcst.fileupload.service.UploadFileChannelService;
+import com.jiayue.ipfcst.fileupload.service.UploadObjectService;
+import com.jiayue.ipfcst.fileupload.util.FileConstant;
+import com.jiayue.ipfcst.fileupload.util.FileMutableInteger;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.LineNumberReader;
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * netty服务端中转处理类
+ *
+ * @author xsl
+ * @version 3.0
+ */
+public class ServerFor102TransitHandler extends ChannelInboundHandlerAdapter {
+  @Autowired
+  UploadFileChannelService uploadFileChannelService = SpringContextHolder.getApplicationContext().getBean(UploadFileChannelService.class);
+  @Autowired
+  ElectricFieldService electricFieldService = SpringContextHolder.getApplicationContext().getBean(ElectricFieldService.class);
+  @Autowired
+  UploadObjectService uploadObjectService = SpringContextHolder.getApplicationContext().getBean(UploadObjectService.class);
+  @Autowired
+  ServerFor102StandardService serverFor102StandardService = SpringContextHolder.getApplicationContext().getBean(ServerFor102StandardService.class);
+  @Autowired
+  ServerFor102NMService serverFor102NMService = SpringContextHolder.getApplicationContext().getBean(ServerFor102NMService.class);
+  @Autowired
+  ChannelDisconLogService channelDisconLogService = SpringContextHolder.getApplicationContext().getBean(ChannelDisconLogService.class);
+  @Autowired
+  AppenderFactory appenderFactory = SpringContextHolder.getApplicationContext().getBean(AppenderFactory.class);
+
+  private int readerIdleTime;
+
+  ServerFor102TransitHandler(Integer readerIdleTime) {
+    this.readerIdleTime = readerIdleTime;
+  }
+
+  private UploadFileChannel uploadFileChannel = null;
+
+  private UploadObject uploadObject = null;
+
+  private Logger uploadLogger = null;
+
+  /**
+   * 客户端连接会触发
+   */
+  @Override
+  public void channelActive(ChannelHandlerContext ctx) throws Exception {
+    InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
+    // 获取远程IP
+    String remoteIP = insocket.getAddress().getHostAddress();
+    // 根据远端IP获取上报对象报文日志
+    List<UploadFileChannel> uploadFileChannelList = uploadFileChannelService.get();
+    List<UploadFileChannel> uploadFileChannels = uploadFileChannelList.stream().filter(s -> s.getRemoteIp().equals(remoteIP)).collect(Collectors.toList());
+    uploadFileChannel = uploadFileChannels.get(0);
+    List<UploadObject> uploadObjectList = uploadObjectService.get();
+    uploadObject = uploadObjectList.stream().filter(s -> s.getId().equals(uploadFileChannel.getUploadObjectId())).collect(Collectors.toList()).get(0);
+    uploadLogger = appenderFactory.getLogger(uploadObject.getUploadObjectName(), uploadObject.getUploadProtocolEnum().getMessage(),uploadObject.getObjectNo());
+    if (FileConstant.channelIpPortMap.get(remoteIP)!=null){
+      uploadLogger.info(remoteIP+":"+ FileConstant.channelIpPortMap.get(remoteIP) + "已在交互,新连接"+remoteIP+":"+insocket.getPort()+"不能再次连接");
+      ctx.close();
+    }
+    else{
+      uploadLogger.info(ctx.channel().remoteAddress() + "接入通道");
+      // 设置通道状态正常
+      FileConstant.channelStatusMap.put(uploadObject.getId() + "-" + uploadFileChannel.getId(), "1");
+      FileConstant.channelIpPortMap.put(remoteIP, insocket.getPort());
+      List<ChannelDisconLog> channelDisconLogList = channelDisconLogService.get(String.valueOf(uploadFileChannel.getId()));
+      if (channelDisconLogList.size()>0){
+        channelDisconLogList.sort(Comparator.comparing(ChannelDisconLog::getCreateTime).reversed());
+        Long reconnectTime = new Date().getTime();
+        ChannelDisconLog channelDisconLog = channelDisconLogList.get(0);
+        channelDisconLog.setReconnectTime(reconnectTime);
+        // 计算时长
+        Long durationL = reconnectTime-channelDisconLog.getDisconnectTime();
+        if (durationL/1000>=60){
+          channelDisconLog.setDuration(String.valueOf(durationL/(1000*60))+"分");
+        }
+        else{
+          channelDisconLog.setDuration(String.valueOf(durationL/1000)+"秒");
+        }
+        channelDisconLogService.save(channelDisconLog);
+      }
+    }
+  }
+
+  /**
+   * 客户端发消息会触发
+   */
+  @Override
+  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+    // 设置通道状态正常
+    FileConstant.channelStatusMap.put(uploadObject.getId() + "-" + uploadFileChannel.getId(), "1");
+    // 获取场站信息
+    ElectricField electricField = electricFieldService.get();
+    // 返回的报文
+    String receiveMessage = "";
+    String province = electricField.getProvinceEnum().name().toString();
+    if ("E15".equals(province)) {
+      // 蒙东102服务端
+      receiveMessage = serverFor102NMService.handlerMessage(msg.toString(), uploadObject, uploadFileChannel, electricField, uploadLogger,ctx);
+    } else {
+      // 标准102服务端
+      receiveMessage = serverFor102StandardService.handlerMessage(msg.toString(), uploadObject, uploadFileChannel, electricField, uploadLogger,ctx);
+    }
+    ctx.channel().writeAndFlush(receiveMessage);
+  }
+
+  /**
+   * 发生异常触发
+   */
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+    uploadLogger.error(ctx.channel().remoteAddress() +"102上报异常", cause);
+    Iterator<Map.Entry<String, FileMutableInteger>> countMap = FileConstant.uploadCountMap.entrySet().iterator();
+    while (countMap.hasNext()){
+      Map.Entry<String, FileMutableInteger> entry = countMap.next();
+      String key = entry.getKey();
+      if (key.contains(uploadObject.getObjectNo() + "@")){
+        countMap.remove();
+      }
+    }
+
+    Iterator<Map.Entry<String, String>> contentMap = FileConstant.fileContentMap.entrySet().iterator();
+    while (contentMap.hasNext()){
+      Map.Entry<String,String> entry = contentMap.next();
+      String key = entry.getKey();
+      if (key.contains(uploadObject.getObjectNo() + "@")){
+        contentMap.remove();
+      }
+    }
+    ctx.close();
+  }
+
+  /**
+   * 通道断开触发
+   */
+  @Override
+  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
+    String remoteIP = insocket.getAddress().getHostAddress();
+    Integer remotePort = insocket.getPort();
+    if (FileConstant.channelIpPortMap.get(remoteIP)!=null){
+      if (FileConstant.channelIpPortMap.get(remoteIP).intValue() == remotePort.intValue()) {
+        // 断开的连接是已经交互的连接
+        uploadLogger.info(ctx.channel().remoteAddress() + "通道断开");
+
+        try {
+          Process process = Runtime.getRuntime().exec("ping "+uploadFileChannel.getRemoteIp() + " -c 2");
+          InputStreamReader r = new InputStreamReader(process.getInputStream());
+          LineNumberReader returnData = new LineNumberReader(r);
+
+          String returnMsg="";
+          String line = "";
+          while ((line = returnData.readLine()) != null) {
+            returnMsg += line;
+          }
+
+          if(returnMsg.indexOf("100% loss")!=-1){
+            uploadLogger.info("与 " +uploadFileChannel.getRemoteIp() +" 连接不畅通.");
+          }
+          else{
+            uploadLogger.info("与 " +uploadFileChannel.getRemoteIp() +" 连接畅通.");
+          }
+
+          returnData.close();
+          r.close();
+          process.getInputStream().close();
+          process.getOutputStream().close();
+          process.destroy();
+
+        } catch (IOException e) {
+          uploadLogger.error("ping对方ip执行失败",e);
+        }
+
+        try {
+          String cmd = "netstat -anp | grep "+uploadFileChannel.getLocalPort();
+          Process process = Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", cmd});
+          process.waitFor();
+          InputStreamReader r = new InputStreamReader(process.getInputStream());
+          LineNumberReader returnData = new LineNumberReader(r);
+
+          String returnMsg="";
+          String line = "";
+          while ((line = returnData.readLine()) != null) {
+            returnMsg += line;
+          }
+          uploadLogger.info("断开后监听本地端口"+uploadFileChannel.getLocalPort()+":"+returnMsg);
+
+          returnData.close();
+          r.close();
+          process.getInputStream().close();
+          process.getOutputStream().close();
+          process.destroy();
+
+        } catch (IOException e) {
+          uploadLogger.error("监听本地端口"+uploadFileChannel.getLocalPort()+"执行失败",e);
+        }
+
+        // 设置通道状态不通
+        FileConstant.channelStatusMap.put(uploadObject.getId() + "-" + uploadFileChannel.getId(), "0");
+        // 保存通道断开日志
+        ChannelDisconLog channelDisconLog = new ChannelDisconLog();
+        channelDisconLog.setChannelId(String.valueOf(uploadFileChannel.getId()));
+        channelDisconLog.setChannelName(uploadFileChannel.getChannelName());
+        channelDisconLog.setDisconnectTime(new Date().getTime());
+        channelDisconLogService.save(channelDisconLog);
+        Iterator<Map.Entry<String, FileMutableInteger>> countMap = FileConstant.uploadCountMap.entrySet().iterator();
+        while (countMap.hasNext()) {
+          Map.Entry<String, FileMutableInteger> entry = countMap.next();
+          String key = entry.getKey();
+          if (key.contains(uploadObject.getObjectNo() + "@")) {
+            countMap.remove();
+          }
+        }
+
+        Iterator<Map.Entry<String, String>> contentMap = FileConstant.fileContentMap.entrySet().iterator();
+        while (contentMap.hasNext()) {
+          Map.Entry<String, String> entry = contentMap.next();
+          String key = entry.getKey();
+          if (key.contains(uploadObject.getObjectNo() + "@")) {
+            contentMap.remove();
+          }
+        }
+        FileConstant.validateMessage.remove(uploadFileChannel.getId() + "");
+        FileConstant.channelIpPortMap.remove(remoteIP);
+      }
+    }
+    super.channelInactive(ctx);
+  }
+
+  @Override
+  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+    // 设置通道状态不通
+    FileConstant.channelStatusMap.put(uploadObject.getId() + "-" + uploadFileChannel.getId(), "0");
+    // 长时间没收到信息,服务端主动发送确认复位命令
+//    uploadLogger.info("发送报文:[{}秒没收到信息,从站主动发送确认复位]10 00 FF FF FE 16",readerIdleTime);
+//    ctx.channel().writeAndFlush("1000FFFFFE16");
+    uploadLogger.info(ctx.channel().remoteAddress() +"{}秒没收到信息,关闭通道",readerIdleTime);
+    Iterator<Map.Entry<String, FileMutableInteger>> countMap = FileConstant.uploadCountMap.entrySet().iterator();
+    while (countMap.hasNext()){
+      Map.Entry<String, FileMutableInteger> entry = countMap.next();
+      String key = entry.getKey();
+      if (key.contains(uploadObject.getObjectNo() + "@")){
+        countMap.remove();
+      }
+    }
+
+    Iterator<Map.Entry<String, String>> contentMap = FileConstant.fileContentMap.entrySet().iterator();
+    while (contentMap.hasNext()){
+      Map.Entry<String,String> entry = contentMap.next();
+      String key = entry.getKey();
+      if (key.contains(uploadObject.getObjectNo() + "@")){
+        contentMap.remove();
+      }
+    }
+    FileConstant.validateMessage.remove(uploadFileChannel.getId()+"");
+    ctx.close();
+  }
+}

+ 67 - 0
ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/IEC102/UploadFileNettyServer.java

@@ -0,0 +1,67 @@
+package com.jiayue.ipfcst.fileupload.IEC102;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import lombok.extern.slf4j.Slf4j;
+
+import java.net.InetSocketAddress;
+
+/**
+ * 开启netty服务端
+ *
+ * @author xsl
+ * @version 3.0
+ */
+@Slf4j
+public class UploadFileNettyServer extends NettyParent {
+//  private Logger log;
+  private int readerIdleTime;
+  public ChannelFuture future = null;
+  private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
+  private final EventLoopGroup workerGroup = new NioEventLoopGroup(1);
+  public String localPort = "";
+  InetSocketAddress socketAddress = null;
+
+  public UploadFileNettyServer(InetSocketAddress socketAddress,Integer readerIdleTime){
+    this.socketAddress = socketAddress;
+    this.readerIdleTime = readerIdleTime;
+  }
+
+  public void start() {
+    ServerBootstrap bootstrap = new ServerBootstrap()
+      .group(bossGroup, workerGroup)
+      .channel(NioServerSocketChannel.class)
+      .childHandler(new ServerFor102InitChannel(readerIdleTime))
+      .localAddress(socketAddress)
+      //设置队列大小
+      .option(ChannelOption.SO_BACKLOG, 1024)
+      // 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
+      .childOption(ChannelOption.SO_KEEPALIVE, true);
+    try {
+      this.localPort = String.valueOf(socketAddress.getPort());
+      future = bootstrap.bind(socketAddress).sync();
+      log.info("102服务器启动开始监听端口: {}", socketAddress.getPort());
+    } catch (Exception e) {
+      e.printStackTrace();
+      log.error("开启102服务端口: {}失败", socketAddress.getPort(), e);
+    }
+  }
+
+  /**
+   * 停止服务
+   */
+  public void destroy() {
+    log.info("Shutdown Netty Server...");
+    if (future != null) {
+      future.channel().close();
+    }
+    workerGroup.shutdownGracefully();
+    bossGroup.shutdownGracefully();
+    log.info("Shutdown Netty Server Success!");
+  }
+
+}

+ 118 - 0
ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/config/AppenderFactory.java

@@ -0,0 +1,118 @@
+package com.jiayue.ipfcst.fileupload.config;
+
+import ch.qos.logback.classic.Logger;
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.encoder.PatternLayoutEncoder;
+import ch.qos.logback.classic.filter.ThresholdFilter;
+import ch.qos.logback.core.rolling.RollingFileAppender;
+import ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy;
+import ch.qos.logback.core.util.FileSize;
+import ch.qos.logback.core.util.OptionHelper;
+import com.jiayue.ipfcst.fileupload.util.FileConstant;
+import com.jiayue.ipfcst.fileupload.util.FileUtil;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+/**
+ * 创建动态日志
+ *
+ * @author xsl
+ * @version 3.0
+ */
+@Component
+public class AppenderFactory {
+  @Value("${fileupload.log.level}")
+  private String logLevel;
+  @Value("${fileupload.log.totalSizeCap}")
+  private String totalSizeCap;
+  @Value("${fileupload.log.maxFileSize}")
+  private String maxFileSize;
+  @Value("${fileupload.log.maxHistory}")
+  private String maxHistory;
+
+  public Logger getLogger(String name, String type, String no) {
+    Logger logger = FileConstant.uploadLogMap.get(no);
+    if (logger != null) {
+      return logger;
+    } else {
+      if (logger != null) {
+        return logger;
+      }
+      logger = getAppender(name, type, no);
+      FileConstant.uploadLogMap.put(no, logger);
+    }
+    return logger;
+  }
+
+  /**
+   * 通过传入的名字和级别,动态设置appender
+   *
+   * @param name
+   * @return
+   */
+  public Logger getAppender(String name, String type, String no) {
+//    Level level = Level.toLevel(this.logLevel);
+//    DateFormat format = DateFormat.getDateInstance(DateFormat.MEDIUM, Locale.SIMPLIFIED_CHINESE);
+    LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
+    //这里是可以用来设置appender的,在xml配置文件里面,是这种形式:
+    // <appender name="error" class="ch.qos.logback.core.rolling.RollingFileAppender">
+    RollingFileAppender appender = new RollingFileAppender();
+    //这里设置级别过滤器
+    ThresholdFilter levelFilter = createLevelFilter(this.logLevel);
+    levelFilter.start();
+    appender.addFilter(levelFilter);
+    // 但可以使用<contextName>设置成其他名字,用于区分不同应用程序的记录。一旦设置,不能修改。
+    appender.setContext(context);
+    //appender的name属性
+    appender.setName("upload-" + type + "-" + name + "-" + no);
+    //设置文件名
+//        appender.setFile(OptionHelper.substVars("d:/eppLog/"+ name+"/" + "fileUploadLogFile-" +name+"-"+ type+".%d{yyyy-MM-dd}.%i.log",context));
+    appender.setAppend(true);
+    appender.setPrudent(false);
+    //设置文件创建时间及大小的类
+    SizeAndTimeBasedRollingPolicy policy = new SizeAndTimeBasedRollingPolicy();
+    //文件名格式
+    String fp = OptionHelper.substVars(FileUtil.getLogsPath() + "/" + "%d{yyyy-MM-dd}" + "/" + "upload-" + type + "-" + name + "-" + no + ".%d{yyyy-MM-dd}.%i.log", context);
+    //最大日志文件大小
+    policy.setMaxFileSize(FileSize.valueOf(this.maxFileSize));
+    //设置文件名模式
+    policy.setFileNamePattern(fp);
+    //设置日志文件保留天数
+    policy.setMaxHistory(Integer.parseInt(this.maxHistory));
+    //总大小限制
+    policy.setTotalSizeCap(FileSize.valueOf(this.totalSizeCap));
+    //设置父节点是appender
+    policy.setParent(appender);
+    // 但可以使用<contextName>设置成其他名字,用于区分不同应用程序的记录。一旦设置,不能修改。
+    policy.setContext(context);
+    policy.setCleanHistoryOnStart(true);
+    policy.start();
+    PatternLayoutEncoder encoder = new PatternLayoutEncoder();
+    //设置上下文,每个logger都关联到logger上下文,默认上下文名称为default。
+    // 但可以使用<contextName>设置成其他名字,用于区分不同应用程序的记录。一旦设置,不能修改。
+    encoder.setContext(context);
+    //设置格式
+//    encoder.setPattern("%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n");
+    encoder.setPattern("%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level - %msg%n");
+    encoder.start();
+    //加入下面两个节点
+    appender.setRollingPolicy(policy);
+    appender.setEncoder(encoder);
+    appender.start();
+    Logger logger = context.getLogger("upload-" + type + "-" + name + "-" + no);
+    //设置不向上级打印信息
+    logger.setAdditive(false);
+    logger.addAppender(appender);
+    return logger;
+  }
+
+  private ThresholdFilter createLevelFilter(String level) {
+    ThresholdFilter levelFilter = new ThresholdFilter();
+    levelFilter.setLevel(level);
+//    levelFilter.setOnMatch(ACCEPT);
+//    levelFilter.setOnMismatch(DENY);
+    levelFilter.start();
+    return levelFilter;
+  }
+}

+ 120 - 0
ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/config/FileUploadConfig.java

@@ -0,0 +1,120 @@
+package com.jiayue.ipfcst.fileupload.config;
+
+import com.jiayue.ipfcst.common.core.util.DateMomentUtil;
+import com.jiayue.ipfcst.common.data.entity.UploadFileLog;
+import com.jiayue.ipfcst.common.data.repository.UploadFileLogRepository;
+import com.jiayue.ipfcst.console.service.SysParameterService;
+import com.jiayue.ipfcst.fileupload.service.UploadFileChannelService;
+import com.jiayue.ipfcst.fileupload.util.FileConstant;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.time.DateFormatUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.core.annotation.Order;
+import org.springframework.data.jpa.domain.Specification;
+import org.springframework.stereotype.Component;
+
+import javax.persistence.criteria.Predicate;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * 启动netty服务端
+ *
+ * @author xsl
+ * @version 3.0
+ */
+@Component
+@Order(100)
+@Slf4j
+public class FileUploadConfig {
+  @Autowired
+  UploadFileChannelService uploadFileChannelService;
+  @Autowired
+  SysParameterService sysParameterService;
+  @Autowired
+  UploadFileLogRepository uploadFileLogRepository;
+  /**
+   * 开启102s服务端
+   */
+  @Bean
+  public void startIec102Server() {
+    uploadFileChannelService.startAllChannel();
+  }
+
+  /**
+   * 启动将fileupload/process目录里的文件加载到文件缓存变量中
+   */
+  @Bean
+  public void loadUploadFileConstant() {
+    // 查询文件日志数据库,将未上报成功的查询出来
+    Specification<UploadFileLog> specification = this.getQuerySpecification();
+    List<UploadFileLog> uploadFileLogList = uploadFileLogRepository.findAll(specification);
+    for (UploadFileLog uploadFileLog:uploadFileLogList){
+      String readyFileKey = uploadFileLog.getUploadObjectNo()+"@"+uploadFileLog.getFileTypeEnum().toString()+"@"+uploadFileLog.getFileName();
+      FileConstant.readyUploadFileMap.put(readyFileKey, uploadFileLog);
+      FileConstant.fileShouldMomentMap.put(readyFileKey, uploadFileLog.getId());
+    }
+//    try {
+//      Files.walkFileTree(Paths.get(FileUtil.getFileUploadPath() + File.separator + "process"), new SimpleFileVisitor<Path>() {
+//        // 访问文件时自动调用此方法,attrs参数可以获取到文件大小
+//        @Override
+//        public FileVisitResult visitFile(Path vfile, BasicFileAttributes attrs) {
+//          try {
+//            File file = vfile.toFile();
+//            int processIdx = file.getPath().indexOf("process");
+//            if (processIdx > -1) {
+//              String separator = "/|\\\\";
+//              String[] dirs = file.getPath().substring(processIdx).split(separator);
+//              String readyFileKey = dirs[dirs.length - 3].toString() + "-" + dirs[dirs.length - 2].toString() + "-" + dirs[dirs.length - 1].toString();
+//              FileConstant.readyUploadFileMap.put(readyFileKey, file.getPath());
+//              log.info("上报文件:" + file.getPath() + "存入缓存,缓存中数量:" + FileConstant.readyUploadFileMap.size());
+//            }
+//          } catch (Exception e) {
+//            log.error("启动加载上报文件失败:" + vfile.toFile().getPath());
+//          }
+//          return FileVisitResult.CONTINUE;
+//        }
+//
+//        // 访问文件夹之前自动调用此方法
+//        @Override
+//        public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) {
+//          return FileVisitResult.CONTINUE;
+//        }
+//
+//        //访问文件失败时自动调用此方法
+//        @Override
+//        public FileVisitResult visitFileFailed(Path file, IOException e) {
+//          return FileVisitResult.CONTINUE;
+//        }
+//
+//        //访问文件夹之后自动调用此方法
+//        @Override
+//        public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
+//          return FileVisitResult.CONTINUE;
+//        }
+//      });
+//    } catch (Exception e) {
+//      log.error("启动加载上报文件到缓存中失败:", e);
+//    }
+  }
+
+  /**
+   * 封装文件日志查询条件
+   * @return
+   */
+  private Specification<UploadFileLog> getQuerySpecification() {
+    return (Specification<UploadFileLog>) (root, criteriaQuery, cb) -> {
+      Long startTime = DateMomentUtil.getDayStartTime(new Date().getTime());
+      // 结束时间(开始加24小时再减去1秒)
+      Long endTime = startTime + 1000 *60 * 60 *24 -1;
+
+      List<Predicate> predicates = new ArrayList<>();
+      predicates.add(cb.equal(root.get("fileStatusEnum").as(String.class), "E1"));
+      predicates.add(cb.greaterThanOrEqualTo(root.get("createTime").as(String.class), DateFormatUtils.format(startTime, "yyyy-MM-dd HH:mm:ss'")));
+      predicates.add(cb.lessThanOrEqualTo(root.get("createTime").as(String.class), DateFormatUtils.format(endTime, "yyyy-MM-dd HH:mm:ss'")));
+      return cb.and(predicates.toArray(new Predicate[predicates.size()]));
+    };
+  }
+}

+ 17 - 0
ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/dto/Validate102Dto.java

@@ -0,0 +1,17 @@
+package com.jiayue.ipfcst.fileupload.dto;
+
+import lombok.Data;
+
+/**
+ * 校验102Dto
+ *
+ * @author xsl
+ * @version 3.0
+ */
+@Data
+public class Validate102Dto {
+  // 接收到报文帧第一个字节
+  public String messageFirst;
+  // 报文对应的判断码,10帧存第二个字节,68帧存传输原因
+  public String messageReason;
+}

+ 88 - 80
ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/service/BaseUploadFileService.java

@@ -4,9 +4,12 @@ import com.jiayue.ipfcst.common.core.util.DateTimeUtil;
 import com.jiayue.ipfcst.common.data.constant.enums.AlarmTypeEnum;
 import com.jiayue.ipfcst.common.data.constant.enums.FileStatusEnum;
 import com.jiayue.ipfcst.common.data.constant.enums.FileTypeEnum;
+import com.jiayue.ipfcst.common.data.entity.UploadFileChannel;
 import com.jiayue.ipfcst.common.data.entity.UploadFileLog;
+import com.jiayue.ipfcst.common.data.entity.UploadObject;
 import com.jiayue.ipfcst.common.data.repository.UploadFileLogRepository;
 import com.jiayue.ipfcst.common.data.service.BaseService;
+import com.jiayue.ipfcst.fileupload.util.FileConstant;
 import com.jiayue.ipfcst.fileupload.util.FileUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.io.FileUtils;
@@ -37,6 +40,10 @@ import java.util.stream.Collectors;
 public abstract class BaseUploadFileService extends BaseService {
   @Autowired
   UploadFileLogRepository uploadFileLogRepository;
+  @Autowired
+  UploadObjectService uploadObjectService;
+  @Autowired
+  UploadFileChannelService uploadFileChannelService;
 
   /**
    * 获取文件上报数值类型参数
@@ -114,7 +121,7 @@ public abstract class BaseUploadFileService extends BaseService {
    * @param fileType
    * @param uploadFileEndTime
    */
-  protected void copyUploadFile(StringWriter writer, File file, String fileType, Long uploadFileEndTime, Date createTime) {
+  protected void copyUploadFile(StringWriter writer, File file, String fileType, Long uploadFileEndTime, Date createTime,String stationCode) {
     FileOutputStream os = null;
     try {
       os = new FileOutputStream(file);
@@ -122,7 +129,7 @@ public abstract class BaseUploadFileService extends BaseService {
       os.write(writer.toString().getBytes("UTF-8"));
       os.flush();
       // 将文件复制到上报路径中
-      copyFileToUploadDir(file, fileType, uploadFileEndTime, createTime);
+      copyFileToUploadDir(file, fileType, uploadFileEndTime, createTime,stationCode);
     } catch (IOException e) {
       throw new RuntimeException(e);
     } finally {
@@ -149,83 +156,84 @@ public abstract class BaseUploadFileService extends BaseService {
    * @param uploadFileEndTime 上报文件截止时间
    */
   @Transactional(propagation = Propagation.REQUIRED)
-  protected void copyFileToUploadDir(File file, String fileType, Long uploadFileEndTime, Date createTime) {
-//    String destFileDir = null;
-//
-//    try {
-//      // 获取上报对象
-//      List<UploadObject> uploadObjectList = uploadObjectService.get();
-//      if (uploadObjectList.isEmpty()) {
-//        // 没有有效上报通道进行告警
-//        String errorInfo = "文件生成没有上报对象可用";
-//        String name = file.getName() + "生成失败";
-//        String describe = FileTypeEnum.valueOf(fileType).getMessage();
-//        String solution = "请配置相关上报对象";
-//        super.saveSysAlarm(AlarmTypeEnum.E1, name, describe, errorInfo, solution);
-//      } else {
-//        // 遍历上报对象生成对应的文件
-//        for (UploadObject uploadObject : uploadObjectList) {
-//          List<UploadFileChannel> uploadFileChannelList = uploadFileChannelService.getByObjectId(uploadObject.getId());
-//          // 找出对象下是否有可用的通道
-//          List<UploadFileChannel> filterUploadFileChannelList = uploadFileChannelList.stream().filter(s -> "E1".equals(s.getChannelStatusEnum().toString())).collect(Collectors.toList());
-//          if (filterUploadFileChannelList.size() > 0) {
-//            String[] uploadFileType = uploadObject.getUploadFileType().split(",");
-//            for (int k = 0; k < uploadFileType.length; k++) {
-//              if (uploadFileType[k].equals(fileType)) {
-//                // 上报文件目录
-//                destFileDir = FileUtil.getFileUploadPath() + File.separator + "process" + File.separator + uploadObject.getObjectNo() + File.separator + fileType;
-//                File destDir = new File(destFileDir);
-//                if (!destDir.exists()) {// 如果目录不存在则创建uploadFileEndTime目录
-//                  boolean b = destDir.mkdirs();
-//                  if (!b) // 如果创建失败则抛出异常
-//                    throw new RuntimeException(destFileDir + " 目录创建失败");
-//                }
-//                if (!new File(destDir + File.separator + file.getName()).exists()) {
-//                  FileUtils.copyFileToDirectory(file, destDir);
-//                  UploadFileLog uploadFileLog = new UploadFileLog();
-//                  uploadFileLog.setUploadObjectId(uploadObject.getId());
-//                  uploadFileLog.setUploadObjectName(uploadObject.getUploadObjectName());
-//                  uploadFileLog.setUploadObjectNo(uploadObject.getObjectNo());
-//                  uploadFileLog.setFileName(file.getName());
-//                  uploadFileLog.setFileTypeEnum(FileTypeEnum.valueOf(fileType));
-//                  uploadFileLog.setFileStatusEnum(FileStatusEnum.E1);
-//                  uploadFileLog.setUploadProtocolEnum(uploadObject.getUploadProtocolEnum());
-//                  uploadFileLog.setUploadCounter(0);
-//                  uploadFileLog.setUploadFileEndTime(uploadFileEndTime);
-//
-//                  uploadFileLog = this.uploadFileLogRepository.save(uploadFileLog);
-//
-//                  if (createTime != null) {
-//                    uploadFileLog.setCreateTime(createTime);
-//                    Date date = new Date();
-//                    String format = DateFormatUtils.format(date, "yyyy-MM-dd HH:mm:ss");
-//                    uploadFileLog.setBackupA(format);
-//                    uploadFileLog = this.uploadFileLogRepository.save(uploadFileLog);
-//                  }
-//                  String readyFileKey = uploadObject.getObjectNo() + "@" + fileType + "@" + file.getName();
-//                  FileConstant.fileShouldMomentMap.put(readyFileKey, uploadFileLog.getId());
-//                  FileConstant.readyUploadFileMap.put(readyFileKey, uploadFileLog);
-//
-//                  log.debug("文件生成存入缓存:" + file.getName());
-//                  log.info("上报对象编号:" + uploadObject.getObjectNo() + ",生成文件" + file.getName() + "成功");
-//                } else {
-//                  log.info("本地文件已经生成,上报对象编号:" + uploadObject.getObjectNo() + "," + file.getName() + "不再生成了");
-//                }
-//                break;
-//              }
-//            }
-//          }
-//        }
-//      }
-//    } catch (Exception e) {
-//      // 文件复制失败进行告警
-//      String errorInfo = "复制文件[" + file.getName() + "]到上报目录[" + destFileDir + "]失败";
-//      log.error(errorInfo, e);
-//      // 进行告警
-//      String name = file.getName() + "生成失败";
-//      String describe = FileTypeEnum.valueOf(fileType).getMessage();
-//      String solution = "请查看日志异常信息";
-//      super.saveSysAlarm(AlarmTypeEnum.E1, name, describe, errorInfo, solution);
-//    }
+  protected void copyFileToUploadDir(File file, String fileType, Long uploadFileEndTime, Date createTime,String stationCode) {
+    String destFileDir = null;
+
+    try {
+      // 获取上报对象
+      List<UploadObject> uploadObjectList = uploadObjectService.get();
+      if (uploadObjectList.isEmpty()) {
+        // 没有有效上报通道进行告警
+        String errorInfo = "文件生成没有上报对象可用";
+        String name = file.getName() + "生成失败";
+        String describe = FileTypeEnum.valueOf(fileType).getMessage();
+        String solution = "请配置相关上报对象";
+      } else {
+        // 遍历上报对象生成对应的文件
+        for (UploadObject uploadObject : uploadObjectList) {
+          if (uploadObject.getStationCode().equals(stationCode)){
+            List<UploadFileChannel> uploadFileChannelList = uploadFileChannelService.getByObjectId(uploadObject.getId());
+            // 找出对象下是否有可用的通道
+            List<UploadFileChannel> filterUploadFileChannelList = uploadFileChannelList.stream().filter(s -> "E1".equals(s.getChannelStatusEnum().toString())).collect(Collectors.toList());
+            if (filterUploadFileChannelList.size()>0){
+              String[] uploadFileType = uploadObject.getUploadFileType().split(",");
+              for (int k = 0; k < uploadFileType.length; k++) {
+                if (uploadFileType[k].equals(fileType)) {
+                  // 上报文件目录
+                  destFileDir = FileUtil.getFileUploadPath() + File.separator + stationCode+File.separator+"process" + File.separator + uploadObject.getObjectNo() + File.separator + fileType;
+                  File destDir = new File(destFileDir);
+                  if (!destDir.exists()) {// 如果目录不存在则创建uploadFileEndTime目录
+                    boolean b = destDir.mkdirs();
+                    if (!b) // 如果创建失败则抛出异常
+                      throw new RuntimeException(destFileDir + " 目录创建失败");
+                  }
+                  if (!new File(destDir+File.separator+file.getName()).exists()){
+                    FileUtils.copyFileToDirectory(file, destDir);
+                    UploadFileLog uploadFileLog = new UploadFileLog();
+                    uploadFileLog.setUploadObjectId(uploadObject.getId());
+                    uploadFileLog.setUploadObjectName(uploadObject.getUploadObjectName());
+                    uploadFileLog.setUploadObjectNo(uploadObject.getObjectNo());
+                    uploadFileLog.setFileName(file.getName());
+                    uploadFileLog.setFileTypeEnum(FileTypeEnum.valueOf(fileType));
+                    uploadFileLog.setFileStatusEnum(FileStatusEnum.E1);
+                    uploadFileLog.setUploadProtocolEnum(uploadObject.getUploadProtocolEnum());
+                    uploadFileLog.setUploadCounter(0);
+                    uploadFileLog.setUploadFileEndTime(uploadFileEndTime);
+
+                    uploadFileLog = this.uploadFileLogRepository.save(uploadFileLog);
+
+                    if(createTime!=null){
+                      uploadFileLog.setCreateTime(createTime);
+                      Date date = new Date();
+                      String format = DateFormatUtils.format(date, "yyyy-MM-dd HH:mm:ss");
+                      uploadFileLog.setBackupA(format);
+                      uploadFileLog = this.uploadFileLogRepository.save(uploadFileLog);
+                    }
+                    String readyFileKey = uploadObject.getObjectNo() + "@" + fileType + "@" + file.getName();
+                    FileConstant.fileShouldMomentMap.put(readyFileKey, uploadFileLog.getId());
+                    FileConstant.readyUploadFileMap.put(readyFileKey, uploadFileLog);
+
+                    log.debug("文件生成存入缓存:"+file.getName());
+                    log.info("上报对象编号:" + uploadObject.getObjectNo() + ",生成文件" + file.getName() + "成功");
+                  }
+                  else{
+                    log.info("本地文件已经生成,上报对象编号:" + uploadObject.getObjectNo() + "," + file.getName() + "不再生成了");
+                  }
+                  break;
+                }
+              }
+            }
+          }
+        }
+      }
+    } catch (Exception e) {
+      // 文件复制失败进行告警
+      String errorInfo = "复制文件[" + file.getName() + "]到上报目录[" + destFileDir + "]失败";
+      log.error(errorInfo, e);
+      // 进行告警
+      String name = file.getName() + "生成失败";
+      String describe = FileTypeEnum.valueOf(fileType).getMessage();
+      String solution = "请查看日志异常信息";
+    }
   }
 }

+ 223 - 0
ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/service/UploadFileChannelService.java

@@ -0,0 +1,223 @@
+package com.jiayue.ipfcst.fileupload.service;
+
+import ch.qos.logback.classic.Logger;
+import cn.hutool.core.util.StrUtil;
+import com.jiayue.ipfcst.common.data.constant.enums.ChannelStatusEnum;
+import com.jiayue.ipfcst.common.data.entity.UploadFileChannel;
+import com.jiayue.ipfcst.common.data.entity.UploadObject;
+import com.jiayue.ipfcst.common.data.repository.UploadFileChannelRepository;
+import com.jiayue.ipfcst.common.data.repository.UploadURLRepository;
+import com.jiayue.ipfcst.fileupload.IEC102.UploadFileNettyServer;
+import com.jiayue.ipfcst.fileupload.config.AppenderFactory;
+import com.jiayue.ipfcst.fileupload.util.FileConstant;
+import lombok.extern.slf4j.Slf4j;
+import net.sf.ehcache.Cache;
+import net.sf.ehcache.Element;
+import net.sf.ehcache.search.Attribute;
+import net.sf.ehcache.search.Query;
+import net.sf.ehcache.search.Result;
+import net.sf.ehcache.search.Results;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.cache.ehcache.EhCacheCacheManager;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * 上报通道业务层
+ *
+ * @author xsl
+ * @version 3.0
+ */
+@Service
+@Slf4j
+public class UploadFileChannelService {
+  @Autowired
+  UploadFileChannelRepository uploadFileChannelRepository;
+  @Autowired
+  EhCacheCacheManager ehCacheCacheManager;
+  @Autowired
+  AppenderFactory appenderFactory;
+  @Autowired
+  UploadObjectService uploadObjectService;
+  @Autowired
+  UploadURLRepository uploadURLRepository;
+
+  @Value("${fileupload.nettyServer.readerIdleTime}")
+  private String readerIdleTime;
+
+  /**
+   * 保存通道
+   *
+   * @param uploadFileChannel 通道信息
+   */
+  @Transactional(propagation = Propagation.SUPPORTS)
+  public UploadFileChannel save(UploadFileChannel uploadFileChannel) {
+    // 获取上报对象协议类型
+    List<UploadObject> uploadObjectList = uploadObjectService.get();
+    UploadObject uploadObject = uploadObjectList.stream().filter(s -> s.getId().equals(uploadFileChannel.getUploadObjectId())).collect(Collectors.toList()).get(0);
+    if (!"ftp".equals(uploadObject.getUploadProtocolEnum().getCode()) && !"sftp".equals(uploadObject.getUploadProtocolEnum().getCode())) {
+      if (uploadFileChannel.getId() != null) {
+        // 102服务端,将原有的服务先销毁掉
+        if (FileConstant.nettyInstanceMap.get(uploadFileChannel.getId()) != null) {
+          FileConstant.nettyInstanceMap.get(uploadFileChannel.getId()).destroy();
+        }
+      }
+    }
+
+    UploadFileChannel uploadFileChannel1 = uploadFileChannelRepository.saveAndFlush(uploadFileChannel);
+
+    // 创建通道对应的日志
+    Logger logger = appenderFactory.getLogger(uploadObject.getUploadObjectName(), uploadObject.getUploadProtocolEnum().getMessage(), uploadObject.getObjectNo());
+
+    String protocol = uploadObject.getUploadProtocolEnum().getCode();
+
+    if ("102s".equals(protocol)) {
+      if (FileConstant.nettyInstanceMap.get(uploadFileChannel.getLocalIp() + ":" + uploadFileChannel.getLocalPort()) == null) {
+        // 创建netty服务
+        UploadFileNettyServer uploadFileNettyServer = new UploadFileNettyServer(new InetSocketAddress(uploadFileChannel.getLocalIp(), Integer.parseInt(uploadFileChannel.getLocalPort())), Integer.parseInt(readerIdleTime));
+        // 开启102服务端
+        uploadFileNettyServer.start();
+        // 将netty服务端实例保存到缓存变量,用于关闭通道时用。
+        FileConstant.nettyInstanceMap.put(uploadFileChannel.getLocalIp() + ":" + uploadFileChannel.getLocalPort(), uploadFileNettyServer);
+      }
+    }
+    clearChannelCache();
+    return uploadFileChannel1;
+  }
+
+  /**
+   * 删除通道信息
+   *
+   * @param uploadFileChannel 通道信息
+   */
+  @Transactional(propagation = Propagation.SUPPORTS)
+  public void delete(UploadFileChannel uploadFileChannel) {
+    // 获取上报对象协议类型
+    List<UploadObject> uploadObjectList = uploadObjectService.get();
+    UploadObject uploadObject = uploadObjectList.stream().filter(s -> s.getId().equals(uploadFileChannel.getUploadObjectId())).collect(Collectors.toList()).get(0);
+    if (!"ftp".equals(uploadObject.getUploadProtocolEnum().getCode()) && !"sftp".equals(uploadObject.getUploadProtocolEnum().getCode())) {
+      String protocol = uploadObject.getUploadProtocolEnum().getCode();
+      if ("102s".equals(protocol)) {
+        String tempLocalPort = uploadFileChannel.getLocalPort();
+        String tempLocalIp = uploadFileChannel.getLocalIp();
+        List<UploadFileChannel> tempList = uploadFileChannelRepository.findAll();
+        // 过滤出是否存在多个IP和端口相同的102服务通道
+        List<UploadFileChannel> filterList = tempList.stream().filter(s -> s.getLocalIp().equals(tempLocalIp) && s.getLocalPort().equals(tempLocalPort) && StrUtil.hasEmpty(s.getUploadUserName())).collect(Collectors.toList());
+        if (filterList.size() == 1) {
+          // 将netty服务端实例保存到缓存变量,用于关闭通道时用。
+          if (FileConstant.nettyInstanceMap.get(uploadFileChannel.getLocalIp() + ":" + uploadFileChannel.getLocalPort()) != null) {
+            FileConstant.nettyInstanceMap.get(uploadFileChannel.getLocalIp() + ":" + uploadFileChannel.getLocalPort()).destroy();
+          }
+        }
+      }
+    }
+
+    clearChannelCache();
+    uploadFileChannelRepository.delete(uploadFileChannel);
+  }
+
+  /**
+   * 根据ObjectId删除通道
+   */
+  @Transactional()
+  public void deleteByObjId(Integer objId) {
+    uploadFileChannelRepository.deleteAllByUploadObjectId(objId);
+    uploadURLRepository.deleteByUploadObjectId(objId);
+    this.clearChannelCache();
+  }
+
+  /**
+   * 移除缓存中的通道信息
+   */
+  public void clearChannelCache() {
+    List<UploadFileChannel> uploadFileChannelList = this.get();
+    Cache cache = ehCacheCacheManager.getCacheManager().getCache("searchablecache");
+    for (UploadFileChannel uploadFileChannel : uploadFileChannelList) {
+      cache.remove("ufc" + uploadFileChannel.getId());
+    }
+  }
+
+  public List<UploadFileChannel> getByObjectId(Integer id) {
+    List<UploadFileChannel> uploadFileChannelList = uploadFileChannelRepository.findAllByUploadObjectId(id);
+    return uploadFileChannelList;
+  }
+
+  public UploadFileChannel getById(Integer id) {
+    Optional<UploadFileChannel> channel = uploadFileChannelRepository.findById(id);
+    return channel.get();
+  }
+
+  /**
+   * 查询通道信息
+   *
+   * @return 通道信息
+   */
+  public List<UploadFileChannel> get() {
+    Cache cache = ehCacheCacheManager.getCacheManager().getCache("searchablecache");
+    Query query = cache.createQuery();
+    //查询结果中包含Key和value
+    query.includeKeys().includeValues();
+    Attribute<String> keyName = cache.getSearchAttribute("key");
+    query.addCriteria(keyName.ilike("*ufc*"));
+    Results results = query.execute();
+    List<Result> resultList = results.all();
+
+    List<UploadFileChannel> list = new ArrayList();
+    if (resultList != null && !resultList.isEmpty()) {
+      for (Result result : resultList) {
+        UploadFileChannel e = (UploadFileChannel) result.getValue();
+        list.add(e);
+      }
+      results.discard();
+    } else {
+      //查询数据库
+      list = uploadFileChannelRepository.findAll();
+      for (UploadFileChannel uploadFileChannel : list) {
+        //将结果存入缓存
+        cache.put(new Element("ufc" + uploadFileChannel.getId(), uploadFileChannel));
+      }
+    }
+    return list;
+  }
+
+  /**
+   * 开启102所有服务端
+   */
+  public void startAllChannel() {
+    List<UploadFileChannel> uploadFileChannels = get();
+    // 取通道状态正常并且是102s的类型
+    List<UploadFileChannel> useChannelList = uploadFileChannels.stream().filter(s -> s.getChannelStatusEnum().equals(ChannelStatusEnum.E1)).collect(Collectors.toList());
+    for (UploadFileChannel u : useChannelList) {
+      // 获取上报对象协议类型
+      List<UploadObject> uploadObjectList = uploadObjectService.get();
+      UploadObject uploadObject = uploadObjectList.stream().filter(s -> s.getId().equals(u.getUploadObjectId())).collect(Collectors.toList()).get(0);
+
+      if (!"ftp".equals(uploadObject.getUploadProtocolEnum().getCode()) && !"sftp".equals(uploadObject.getUploadProtocolEnum().getCode())) {
+        // 创建通道对应的日志
+        Logger logger = appenderFactory.getLogger(uploadObject.getUploadObjectName(), uploadObject.getUploadProtocolEnum().getMessage(), uploadObject.getObjectNo());
+        if (FileConstant.nettyInstanceMap.get(u.getId()) != null) {
+          FileConstant.nettyInstanceMap.remove(u.getId());
+        }
+
+        if ("102s".equals(uploadObject.getUploadProtocolEnum().getCode())) {
+          if (FileConstant.nettyInstanceMap.get(u.getLocalIp() + ":" + u.getLocalPort()) == null) {
+            // 创建netty服务
+            UploadFileNettyServer uploadFileNettyServer = new UploadFileNettyServer(new InetSocketAddress(u.getLocalIp(), Integer.parseInt(u.getLocalPort())), Integer.parseInt(readerIdleTime));
+            // 开启102服务端
+            uploadFileNettyServer.start();
+            // 将netty服务端实例保存到缓存变量,用于关闭通道时用。
+            FileConstant.nettyInstanceMap.put(u.getLocalIp() + ":" + u.getLocalPort(), uploadFileNettyServer);
+          }
+        }
+      }
+    }
+  }
+}

+ 114 - 0
ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/service/UploadObjectService.java

@@ -0,0 +1,114 @@
+package com.jiayue.ipfcst.fileupload.service;
+
+import com.jiayue.ipfcst.common.core.exception.BusinessException;
+import com.jiayue.ipfcst.common.data.entity.ElectricField;
+import com.jiayue.ipfcst.common.data.entity.UploadObject;
+import com.jiayue.ipfcst.common.data.repository.UploadObjectRepository;
+import com.jiayue.ipfcst.common.data.service.BaseService;
+import lombok.extern.slf4j.Slf4j;
+import net.sf.ehcache.Cache;
+import net.sf.ehcache.Element;
+import net.sf.ehcache.search.Attribute;
+import net.sf.ehcache.search.Query;
+import net.sf.ehcache.search.Result;
+import net.sf.ehcache.search.Results;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cache.ehcache.EhCacheCacheManager;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 上报对象业务层
+ *
+ * @author xsl
+ * @version 3.0
+ */
+@Service
+@Slf4j
+public class UploadObjectService extends BaseService {
+  @Autowired
+  UploadObjectRepository uploadObjectRepository;
+  @Autowired
+  EhCacheCacheManager ehCacheCacheManager;
+
+  /**
+   * 保存上报对象
+   *
+   * @param uploadObject 上报对象信息
+   */
+  @Transactional(propagation = Propagation.SUPPORTS)
+  public UploadObject save(UploadObject uploadObject) {
+    clearUploadObjectCache();
+    return uploadObjectRepository.save(uploadObject);
+  }
+
+  @Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
+  public void saveCloud(List<UploadObject> beans) {
+    if(beans != null && beans.size() > 0){
+      clearUploadObjectCache();
+      uploadObjectRepository.deleteAll();
+      uploadObjectRepository.saveAll(beans);
+    }
+  }
+
+  /**
+   * 删除上报对象
+   *
+   * @param id 上报对象信息
+   */
+  @Transactional(propagation = Propagation.SUPPORTS)
+  public void delete(Integer id) {
+    clearUploadObjectCache();
+    uploadObjectRepository.deleteById(id);
+  }
+
+  /**
+   * 移除缓存中的上报对象信息
+   */
+  public void clearUploadObjectCache() {
+    List<UploadObject> uploadObjectList = this.get();
+    Cache cache = ehCacheCacheManager.getCacheManager().getCache("searchablecache");
+    for (UploadObject uploadObject : uploadObjectList) {
+      cache.remove("uo" + uploadObject.getId());
+    }
+  }
+
+  /**
+   * 查询上报对象信息
+   *
+   * @return 上报对象信息
+   */
+  public List<UploadObject> get() {
+    Cache cache = ehCacheCacheManager.getCacheManager().getCache("searchablecache");
+    Query query = cache.createQuery();
+    //查询结果中包含Key和value
+    query.includeKeys().includeValues();
+    Attribute<String> keyName = cache.getSearchAttribute("key");
+    query.addCriteria(keyName.ilike("*uo*"));
+    Results results = query.execute();
+    List<Result> resultList = results.all();
+
+    List<UploadObject> list = new ArrayList();
+    if (resultList != null && !resultList.isEmpty()) {
+      for (Result result : resultList) {
+        UploadObject e = (UploadObject) result.getValue();
+        list.add(e);
+      }
+      results.discard();
+    } else {
+      //查询数据库
+      list = uploadObjectRepository.findAll();
+      for (UploadObject uploadObject : list) {
+        //将结果存入缓存
+        cache.put(new Element("uo" + uploadObject.getId(), uploadObject));
+      }
+    }
+    return list;
+  }
+}

+ 48 - 0
ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/util/FileConstant.java

@@ -0,0 +1,48 @@
+package com.jiayue.ipfcst.fileupload.util;
+
+import ch.qos.logback.classic.Logger;
+import com.jiayue.ipfcst.common.data.entity.UploadFileLog;
+import com.jiayue.ipfcst.fileupload.IEC102.NettyParent;
+import com.jiayue.ipfcst.fileupload.dto.Validate102Dto;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+
+/**
+ * 上报文件公用变量
+ *
+ * @author xsl
+ * @version 3.0
+ */
+@Slf4j
+public class FileConstant {
+  /*待上报文件的缓存变量,Map<上报对象编号-文件类型-文件名,文件本地路径及文件名>*/
+  public static Map<String, UploadFileLog> readyUploadFileMap = new Hashtable<String, UploadFileLog>();
+  /*文件上报次数累计的缓存变量,Map<上报对象编号-文件类型-文件名,上报次数>*/
+  public static Map<String, FileMutableInteger> uploadCountMap = new Hashtable<String, FileMutableInteger>();
+  /*文件内容缓存变量,Map<上报对象编号-文件类型-文件名,16进制文件内容>*/
+  public static Map<String, String> fileContentMap = new Hashtable<String, String>();
+  /*文件内容及内容位置缓存变量,Map<上报对象编号-文件类型-文件名,map<16进制文件内容,内容在原来来文件的位置>>*/
+  public static Map<String, Map<String,String>> fileContentByIdxMap = new Hashtable<String, Map<String,String>>();
+  /*netty服务实例集合<通道主键ID,netty服务端实例>*/
+  public static Map<String, NettyParent> nettyInstanceMap = new Hashtable<String,NettyParent>();
+  /*通道状态集合<上报对象id-通道id,通讯状态> 通讯状态:1通0不通*/
+  public static Map<String,String> channelStatusMap = new Hashtable<String,String>();
+  /*下载文件内容缓存变量,Map<上报对象编号-文件类型,16进制文件内容>*/
+  public static Map<String, String> downLoadFileContentMap = new Hashtable<String, String>();
+  public static Map<String, Logger> uploadLogMap = new HashMap<>();
+  /*验证对方报文回复正确性<通道主键ID,下一次交互的传输原因>*/
+  public static Map<String,Validate102Dto> validateMessage = new HashMap<>();
+  /*遇到不正常回复时,将文件暂时放入再次上报变量,Map<上报对象编号-文件类型-文件名,"">*/
+  public static Map<String, String> againUploadFileMap = new Hashtable<String, String>();
+  /*文件对应的生成时间,Map<上报对象编号-文件类型-文件名,日志表主键ID>*/
+  public static Map<String, Integer> fileShouldMomentMap = new Hashtable<String, Integer>();
+  /*文件包次数Map<上报对象编号-文件类型-文件名,包次数>*/
+  public static Map<String, String> filePackageNumMap = new Hashtable<String, String>();
+  /*保存102哪个通道在连接Map<Ip,端口>*/
+  public static Map<String, Integer> channelIpPortMap = new Hashtable<String, Integer>();
+  /*通道状态集合<上报对象id,状态> 通讯状态:1通0不通*/
+  public static Map<String,Long> channelTimeMap = new Hashtable<String,Long>();
+}

+ 38 - 0
ipfcst-console/src/main/java/com/jiayue/ipfcst/fileupload/util/FileMutableInteger.java

@@ -0,0 +1,38 @@
+package com.jiayue.ipfcst.fileupload.util;
+
+import com.jiayue.ipfcst.common.data.entity.UploadFileLogDetail;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * 文件上报计数器
+ *
+ * @author xsl
+ * @version 3.0
+ */
+public class FileMutableInteger {
+  int value = 0;
+
+  List<UploadFileLogDetail> uploadFileLogDetail = new ArrayList<>();
+
+  public List<UploadFileLogDetail> getUploadFileLogDetail() {
+    return uploadFileLogDetail;
+  }
+
+  public void setUploadFileLogDetail(List<UploadFileLogDetail> uploadFileLogDetail) {
+    this.uploadFileLogDetail = uploadFileLogDetail;
+  }
+
+  public FileMutableInteger(int val) {
+    this.value = val;
+  }
+
+  public int getValue() {
+    return value;
+  }
+
+  public void setValue(int value) {
+    this.value = value;
+  }
+}