GenerateFileTunnel.java 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package com.jiayue.ipfcst.client.protocol2file;
  2. import com.syjy.DataExchangeException;
  3. import com.syjy.container.ProtocolDataContainer;
  4. import com.syjy.container.ProtocolTunnelContainer;
  5. import com.syjy.container.RecurringTaskContainer;
  6. import com.syjy.tunnelinfo.DataPoint;
  7. import com.syjy.tunnelinfo.TunnelStatus;
  8. import com.syjy.tunnelworker.BaseProtocolTunnel;
  9. import com.syjy.tunnelworker.senders.DataSenderInterface;
  10. import org.joda.time.DateTime;
  11. import org.joda.time.format.DateTimeFormat;
  12. import org.joda.time.format.DateTimeFormatter;
  13. import wei.yigulu.netty.BaseProtocolBuilder;
  14. import java.io.File;
  15. import java.io.IOException;
  16. import java.io.PrintStream;
  17. import java.util.Comparator;
  18. import java.util.Map;
  19. import java.util.concurrent.Executors;
  20. import java.util.concurrent.ScheduledExecutorService;
  21. import java.util.concurrent.TimeUnit;
  22. /**
  23. * @author: xiuwei
  24. * @version:
  25. */
  26. public class GenerateFileTunnel extends BaseProtocolTunnel<GenerateFileTunnelInfo, BaseProtocolBuilder> implements DataSenderInterface {
  27. private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormat.forPattern("yyyyMMddHHmmss");
  28. private final ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);
  29. /**
  30. * 顶层的构造方法
  31. *
  32. * @param tunnelInfo 通道信息
  33. */
  34. public GenerateFileTunnel(GenerateFileTunnelInfo tunnelInfo) {
  35. super(tunnelInfo);
  36. }
  37. @Override
  38. public BaseProtocolTunnel startTunnel() throws DataExchangeException {
  39. setTunnelStatus(TunnelStatus.CONNECTED);
  40. protocolTunnelContainer.addUpdateDateTask(this);
  41. checkFileIsPassedTask();
  42. return this;
  43. }
  44. @Override
  45. public BaseProtocolTunnel buildTunnel() throws DataExchangeException {
  46. return protocolTunnelContainer.addTunnel(this);
  47. }
  48. @Override
  49. public void parseGatherDataPoint() throws DataExchangeException {
  50. this.tunnelInfo.getDataPoints().sort(Comparator.comparingInt(DataPoint::getProtocolPoint));
  51. }
  52. @Override
  53. public void updateData2Protocol() throws DataExchangeException {
  54. //获取各通道状态
  55. Map<String, BaseProtocolTunnel> allTunnel = ProtocolTunnelContainer.getInstance().getAllTunnel();
  56. String tunnelStatus = "correct";
  57. for (Map.Entry<String, BaseProtocolTunnel> e : allTunnel.entrySet()) {
  58. if (!TunnelStatus.CORRECT_STATUS.contains(e.getValue().getTunnelStatus())) {
  59. tunnelStatus = "error";
  60. }
  61. }
  62. StringBuffer stringBuffer = new StringBuffer();
  63. stringBuffer.append("<Data date=" + DATE_TIME_FORMATTER.print(DateTime.now()) + ">\n" +
  64. "@id value\n");
  65. for (int i = 1; i <= this.tunnelInfo.getDataPoints().size(); i++) {
  66. stringBuffer.append("#");
  67. stringBuffer.append(i);
  68. stringBuffer.append("\t");
  69. stringBuffer.append(ProtocolDataContainer.getInstance().getNumber(this.tunnelInfo.getDataPoints().get(i - 1).getId()));
  70. stringBuffer.append("\n");
  71. }
  72. stringBuffer.append("</Data>");
  73. stringBuffer.append("\n");
  74. stringBuffer.append("<TunnelStatus>");
  75. stringBuffer.append("\n");
  76. stringBuffer.append("#");
  77. stringBuffer.append(tunnelStatus);
  78. stringBuffer.append("\n");
  79. stringBuffer.append("</TunnelStatus>");
  80. String fileName = "CFT" + tunnelInfo.getTunnelId() + "_" + DateTime.now().toString("yyyyMMddHHmmss") + "." + tunnelInfo.getFileType();
  81. try {
  82. log.info("向{}{}文件写入数据{}", tunnelInfo.getFilePath(), fileName, stringBuffer.toString());
  83. write2File(tunnelInfo.getFilePath(), fileName, stringBuffer.toString());
  84. pool.schedule(() -> {
  85. File fileFind = new File(tunnelInfo.getFilePath() + File.separator + fileName);
  86. if (fileFind.exists()) {
  87. fileFind.renameTo(new File(tunnelInfo.getFilePath() + File.separator + "CFT" + tunnelInfo.getTunnelId() + "_" + DateTime.now().toString("yyyyMMddHHmmss") + "." + tunnelInfo.getFileType()));
  88. }
  89. }, 3, TimeUnit.SECONDS);
  90. log.info("向{}{}文件写入完成", tunnelInfo.getFilePath(), fileName);
  91. } catch (IOException e) {
  92. log.error("向{}{}文件写入失败", tunnelInfo.getFilePath(), fileName);
  93. e.printStackTrace();
  94. }
  95. }
  96. /**
  97. * 定时检查文件是否传输完成 穿过隔离文件
  98. */
  99. public void checkFileIsPassedTask() {
  100. File file = new File(this.tunnelInfo.getFilePath());
  101. if (file.exists() && file.isDirectory()) {
  102. RecurringTaskContainer.getInstance().addRecurringTask(60, "查看本通道文件是否被隔离传输成功", () -> {
  103. File[] files = file.listFiles();
  104. String fileName = "";
  105. DateTime fileTime;
  106. //遍历反向隔离扫描路径 查看是否有本通道生成的文件,如果有一分钟之前本通道生成的文件仍没被传输走,便认为通道被阻断,
  107. for (File f : files) {
  108. try {
  109. fileName = f.getName();
  110. if (fileName.startsWith("CFT" + this.tunnelInfo.getTunnelId())) {
  111. if (fileName.contains("_")) {
  112. fileTime = DateTime.parse(fileName.substring(fileName.indexOf("_") + 1, fileName.indexOf(".")), DATE_TIME_FORMATTER);
  113. if (fileTime.plusMinutes(1).isBeforeNow()) {
  114. setTunnelStatus(TunnelStatus.CONNECTEDANDCOMMERROR);
  115. log.warn("存在未被反向隔离传输的文件{}",fileName);
  116. //TODO 文件滞留处理逻辑
  117. return null;
  118. }
  119. }
  120. }
  121. }catch (Exception e){
  122. log.error("检查生成协议数据文件传输情况时发生异常",e);
  123. }
  124. }
  125. setTunnelStatus(TunnelStatus.CONNECTED);
  126. return null;
  127. });
  128. }
  129. }
  130. public void write2File(String path, String fileName, String context) throws IOException {
  131. boolean f = System.getProperty("os.name").toLowerCase().contains("windows");
  132. if (!f) {
  133. String c = "echo \"" + context + "\" >" + path + File.separator + fileName;
  134. log.info("执行命令: " + c);
  135. String[] command = {"/bin/sh", "-c", c};
  136. try {
  137. Runtime.getRuntime().exec(command);
  138. } catch (Exception e) {
  139. log.warn("发生异常", e);
  140. }
  141. } else {
  142. File directory = new File(path);
  143. if (!directory.exists()) {
  144. directory.mkdirs();
  145. }
  146. File file = new File(path + "/" + fileName);
  147. if (!file.exists()) {
  148. try {
  149. file.createNewFile();
  150. } catch (IOException e) {
  151. e.printStackTrace();
  152. }
  153. }
  154. PrintStream stream = new PrintStream(file);
  155. stream.print(context);
  156. stream.flush();
  157. stream.close();
  158. }
  159. }
  160. }