package com.jiayue.ipfcst.client.protocol2file; import com.syjy.DataExchangeException; import com.syjy.container.ProtocolDataContainer; import com.syjy.container.ProtocolTunnelContainer; import com.syjy.container.RecurringTaskContainer; import com.syjy.tunnelinfo.DataPoint; import com.syjy.tunnelinfo.TunnelStatus; import com.syjy.tunnelworker.BaseProtocolTunnel; import com.syjy.tunnelworker.senders.DataSenderInterface; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; import wei.yigulu.netty.BaseProtocolBuilder; import java.io.File; import java.io.IOException; import java.io.PrintStream; import java.util.Comparator; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * @author: xiuwei * @version: */ public class GenerateFileTunnel extends BaseProtocolTunnel implements DataSenderInterface { private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormat.forPattern("yyyyMMddHHmmss"); private final ScheduledExecutorService pool = Executors.newScheduledThreadPool(5); /** * 顶层的构造方法 * * @param tunnelInfo 通道信息 */ public GenerateFileTunnel(GenerateFileTunnelInfo tunnelInfo) { super(tunnelInfo); } @Override public BaseProtocolTunnel startTunnel() throws DataExchangeException { setTunnelStatus(TunnelStatus.CONNECTED); protocolTunnelContainer.addUpdateDateTask(this); checkFileIsPassedTask(); return this; } @Override public BaseProtocolTunnel buildTunnel() throws DataExchangeException { return protocolTunnelContainer.addTunnel(this); } @Override public void parseGatherDataPoint() throws DataExchangeException { this.tunnelInfo.getDataPoints().sort(Comparator.comparingInt(DataPoint::getProtocolPoint)); } @Override public void updateData2Protocol() throws DataExchangeException { //获取各通道状态 Map allTunnel = ProtocolTunnelContainer.getInstance().getAllTunnel(); String tunnelStatus = "correct"; for (Map.Entry e : allTunnel.entrySet()) { if (!TunnelStatus.CORRECT_STATUS.contains(e.getValue().getTunnelStatus())) { tunnelStatus = "error"; } } StringBuffer stringBuffer = new StringBuffer(); stringBuffer.append("\n" + "@id value\n"); for (int i = 1; i <= this.tunnelInfo.getDataPoints().size(); i++) { stringBuffer.append("#"); stringBuffer.append(i); stringBuffer.append("\t"); stringBuffer.append(ProtocolDataContainer.getInstance().getNumber(this.tunnelInfo.getDataPoints().get(i - 1).getId())); stringBuffer.append("\n"); } stringBuffer.append(""); stringBuffer.append("\n"); stringBuffer.append(""); stringBuffer.append("\n"); stringBuffer.append("#"); stringBuffer.append(tunnelStatus); stringBuffer.append("\n"); stringBuffer.append(""); String fileName = "CFT" + tunnelInfo.getTunnelId() + "_" + DateTime.now().toString("yyyyMMddHHmmss") + "." + tunnelInfo.getFileType(); try { log.info("向{}{}文件写入数据{}", tunnelInfo.getFilePath(), fileName, stringBuffer.toString()); write2File(tunnelInfo.getFilePath(), fileName, stringBuffer.toString()); pool.schedule(() -> { File fileFind = new File(tunnelInfo.getFilePath() + File.separator + fileName); if (fileFind.exists()) { fileFind.renameTo(new File(tunnelInfo.getFilePath() + File.separator + "CFT" + tunnelInfo.getTunnelId() + "_" + DateTime.now().toString("yyyyMMddHHmmss") + "." + tunnelInfo.getFileType())); } }, 3, TimeUnit.SECONDS); log.info("向{}{}文件写入完成", tunnelInfo.getFilePath(), fileName); } catch (IOException e) { log.error("向{}{}文件写入失败", tunnelInfo.getFilePath(), fileName); e.printStackTrace(); } } /** * 定时检查文件是否传输完成 穿过隔离文件 */ public void checkFileIsPassedTask() { File file = new File(this.tunnelInfo.getFilePath()); if (file.exists() && file.isDirectory()) { RecurringTaskContainer.getInstance().addRecurringTask(60, "查看本通道文件是否被隔离传输成功", () -> { File[] files = file.listFiles(); String fileName = ""; DateTime fileTime; //遍历反向隔离扫描路径 查看是否有本通道生成的文件,如果有一分钟之前本通道生成的文件仍没被传输走,便认为通道被阻断, for (File f : files) { try { fileName = f.getName(); if (fileName.startsWith("CFT" + this.tunnelInfo.getTunnelId())) { if (fileName.contains("_")) { fileTime = DateTime.parse(fileName.substring(fileName.indexOf("_") + 1, fileName.indexOf(".")), DATE_TIME_FORMATTER); if (fileTime.plusMinutes(1).isBeforeNow()) { setTunnelStatus(TunnelStatus.CONNECTEDANDCOMMERROR); log.warn("存在未被反向隔离传输的文件{}",fileName); //TODO 文件滞留处理逻辑 return null; } } } }catch (Exception e){ log.error("检查生成协议数据文件传输情况时发生异常",e); } } setTunnelStatus(TunnelStatus.CONNECTED); return null; }); } } public void write2File(String path, String fileName, String context) throws IOException { boolean f = System.getProperty("os.name").toLowerCase().contains("windows"); if (!f) { String c = "echo \"" + context + "\" >" + path + File.separator + fileName; log.info("执行命令: " + c); String[] command = {"/bin/sh", "-c", c}; try { Runtime.getRuntime().exec(command); } catch (Exception e) { log.warn("发生异常", e); } } else { File directory = new File(path); if (!directory.exists()) { directory.mkdirs(); } File file = new File(path + "/" + fileName); if (!file.exists()) { try { file.createNewFile(); } catch (IOException e) { e.printStackTrace(); } } PrintStream stream = new PrintStream(file); stream.print(context); stream.flush(); stream.close(); } } }