123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177 |
- package com.jiayue.ipfcst.client.protocol2file;
- import com.syjy.DataExchangeException;
- import com.syjy.container.ProtocolDataContainer;
- import com.syjy.container.ProtocolTunnelContainer;
- import com.syjy.container.RecurringTaskContainer;
- import com.syjy.tunnelinfo.DataPoint;
- import com.syjy.tunnelinfo.TunnelStatus;
- import com.syjy.tunnelworker.BaseProtocolTunnel;
- import com.syjy.tunnelworker.senders.DataSenderInterface;
- import org.joda.time.DateTime;
- import org.joda.time.format.DateTimeFormat;
- import org.joda.time.format.DateTimeFormatter;
- import wei.yigulu.netty.BaseProtocolBuilder;
- import java.io.File;
- import java.io.IOException;
- import java.io.PrintStream;
- import java.util.Comparator;
- import java.util.Map;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.TimeUnit;
- /**
- * @author: xiuwei
- * @version:
- */
- public class GenerateFileTunnel extends BaseProtocolTunnel<GenerateFileTunnelInfo, BaseProtocolBuilder> implements DataSenderInterface {
- private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormat.forPattern("yyyyMMddHHmmss");
- private final ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);
- /**
- * 顶层的构造方法
- *
- * @param tunnelInfo 通道信息
- */
- public GenerateFileTunnel(GenerateFileTunnelInfo tunnelInfo) {
- super(tunnelInfo);
- }
- @Override
- public BaseProtocolTunnel startTunnel() throws DataExchangeException {
- setTunnelStatus(TunnelStatus.CONNECTED);
- protocolTunnelContainer.addUpdateDateTask(this);
- checkFileIsPassedTask();
- return this;
- }
- @Override
- public BaseProtocolTunnel buildTunnel() throws DataExchangeException {
- return protocolTunnelContainer.addTunnel(this);
- }
- @Override
- public void parseGatherDataPoint() throws DataExchangeException {
- this.tunnelInfo.getDataPoints().sort(Comparator.comparingInt(DataPoint::getProtocolPoint));
- }
- @Override
- public void updateData2Protocol() throws DataExchangeException {
- //获取各通道状态
- Map<String, BaseProtocolTunnel> allTunnel = ProtocolTunnelContainer.getInstance().getAllTunnel();
- String tunnelStatus = "correct";
- for (Map.Entry<String, BaseProtocolTunnel> e : allTunnel.entrySet()) {
- if (!TunnelStatus.CORRECT_STATUS.contains(e.getValue().getTunnelStatus())) {
- tunnelStatus = "error";
- }
- }
- StringBuffer stringBuffer = new StringBuffer();
- stringBuffer.append("<Data date=" + DATE_TIME_FORMATTER.print(DateTime.now()) + ">\n" +
- "@id value\n");
- for (int i = 1; i <= this.tunnelInfo.getDataPoints().size(); i++) {
- stringBuffer.append("#");
- stringBuffer.append(i);
- stringBuffer.append("\t");
- stringBuffer.append(ProtocolDataContainer.getInstance().getNumber(this.tunnelInfo.getDataPoints().get(i - 1).getId()));
- stringBuffer.append("\n");
- }
- stringBuffer.append("</Data>");
- stringBuffer.append("\n");
- stringBuffer.append("<TunnelStatus>");
- stringBuffer.append("\n");
- stringBuffer.append("#");
- stringBuffer.append(tunnelStatus);
- stringBuffer.append("\n");
- stringBuffer.append("</TunnelStatus>");
- String fileName = "CFT" + tunnelInfo.getTunnelId() + "_" + DateTime.now().toString("yyyyMMddHHmmss") + "." + tunnelInfo.getFileType();
- try {
- log.info("向{}{}文件写入数据{}", tunnelInfo.getFilePath(), fileName, stringBuffer.toString());
- write2File(tunnelInfo.getFilePath(), fileName, stringBuffer.toString());
- pool.schedule(() -> {
- File fileFind = new File(tunnelInfo.getFilePath() + File.separator + fileName);
- if (fileFind.exists()) {
- fileFind.renameTo(new File(tunnelInfo.getFilePath() + File.separator + "CFT" + tunnelInfo.getTunnelId() + "_" + DateTime.now().toString("yyyyMMddHHmmss") + "." + tunnelInfo.getFileType()));
- }
- }, 3, TimeUnit.SECONDS);
- log.info("向{}{}文件写入完成", tunnelInfo.getFilePath(), fileName);
- } catch (IOException e) {
- log.error("向{}{}文件写入失败", tunnelInfo.getFilePath(), fileName);
- e.printStackTrace();
- }
- }
- /**
- * 定时检查文件是否传输完成 穿过隔离文件
- */
- public void checkFileIsPassedTask() {
- File file = new File(this.tunnelInfo.getFilePath());
- if (file.exists() && file.isDirectory()) {
- RecurringTaskContainer.getInstance().addRecurringTask(60, "查看本通道文件是否被隔离传输成功", () -> {
- File[] files = file.listFiles();
- String fileName = "";
- DateTime fileTime;
- //遍历反向隔离扫描路径 查看是否有本通道生成的文件,如果有一分钟之前本通道生成的文件仍没被传输走,便认为通道被阻断,
- for (File f : files) {
- try {
- fileName = f.getName();
- if (fileName.startsWith("CFT" + this.tunnelInfo.getTunnelId())) {
- if (fileName.contains("_")) {
- fileTime = DateTime.parse(fileName.substring(fileName.indexOf("_") + 1, fileName.indexOf(".")), DATE_TIME_FORMATTER);
- if (fileTime.plusMinutes(1).isBeforeNow()) {
- setTunnelStatus(TunnelStatus.CONNECTEDANDCOMMERROR);
- log.warn("存在未被反向隔离传输的文件{}",fileName);
- //TODO 文件滞留处理逻辑
- return null;
- }
- }
- }
- }catch (Exception e){
- log.error("检查生成协议数据文件传输情况时发生异常",e);
- }
- }
- setTunnelStatus(TunnelStatus.CONNECTED);
- return null;
- });
- }
- }
- public void write2File(String path, String fileName, String context) throws IOException {
- boolean f = System.getProperty("os.name").toLowerCase().contains("windows");
- if (!f) {
- String c = "echo \"" + context + "\" >" + path + File.separator + fileName;
- log.info("执行命令: " + c);
- String[] command = {"/bin/sh", "-c", c};
- try {
- Runtime.getRuntime().exec(command);
- } catch (Exception e) {
- log.warn("发生异常", e);
- }
- } else {
- File directory = new File(path);
- if (!directory.exists()) {
- directory.mkdirs();
- }
- File file = new File(path + "/" + fileName);
- if (!file.exists()) {
- try {
- file.createNewFile();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- PrintStream stream = new PrintStream(file);
- stream.print(context);
- stream.flush();
- stream.close();
- }
- }
- }
|