package com.jiayue.biz.job; import cn.hutool.core.thread.ThreadFactoryBuilder; import cn.hutool.json.JSONUtil; import com.jiayue.biz.domain.PointAttribute; import com.jiayue.biz.domain.TunnelInfo; import com.jiayue.biz.domain.WindTowerInfo; import com.jiayue.biz.service.PointAttributeService; import com.jiayue.biz.service.TunnelInfoService; import com.jiayue.biz.service.WindTowerInfoService; import com.jiayue.biz.service.impl.CheckDataRecode; import com.jiayue.biz.service.impl.WindTowerDataParentTableServiceImpl; import com.jiayue.biz.util.CalculationUtil; import com.jiayue.common.utils.DateUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Service; import wei.yigulu.modbus.domain.FunctionCode; import wei.yigulu.modbus.domain.Obj4RequestRegister; import wei.yigulu.modbus.domain.datatype.IModbusDataType; import wei.yigulu.modbus.domain.datatype.ModbusDataTypeEnum; import wei.yigulu.modbus.domain.datatype.NumericModbusData; import wei.yigulu.modbus.exceptiom.ModbusException; import wei.yigulu.modbus.netty.ModbusTcpMasterBuilder; import wei.yigulu.modbus.utils.ModbusRequestDataUtils; import java.math.BigDecimal; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @Service @Slf4j public class ModbusReciveJob { @Autowired TunnelInfoService tunnelInfoService; @Autowired PointAttributeService pointAttributeService; @Autowired WindTowerDataParentTableServiceImpl windTowerDataParentTableService; @Autowired WindTowerInfoService windTowerInfoService; private final ExecutorService calculatorThread = new ThreadPoolExecutor(15, 1000, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactoryBuilder().setNamePrefix("ModbusReciveTaskThread-").build()); private HashMap masterMap = new HashMap<>(); @Bean public void timingTunnel() { List tunnelInfoList = tunnelInfoService.list(); for (TunnelInfo tunnelInfo : tunnelInfoList) { ModbusTcpMasterBuilder master = start(tunnelInfo); masterMap.put(tunnelInfo.getStationId(), master); } } //创建线程 public ModbusTcpMasterBuilder start(TunnelInfo tunnelInfo) { //创建通道后 对通道map 进行 采集数据 //获取场站id 形成map ModbusTcpMasterBuilder master = new ModbusTcpMasterBuilder(tunnelInfo.getIp(), tunnelInfo.getPort()); master.getConfigInfoMap().put("stationId", tunnelInfo.getStationId()); //create 会阻塞线程 calculatorThread.execute(master::create); master.isConnected(); return master; } //关闭通道移除连接 public void stop(String stationId) { if (masterMap.get(stationId).isConnected()) { masterMap.get(stationId).stop(); masterMap.remove(stationId); } } public void tunnel() { List tunnelInfoList = tunnelInfoService.list(); log.info("接入tunnelInfoList通道:{}", JSONUtil.parse(tunnelInfoList)); List attributeList = pointAttributeService.list(); String time = DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:00"); log.info("接入modbus时间:{}",time); //TODO 现场测试筛选数据是否能正常入库 // List> mapList = new ArrayList<>(); for (Map.Entry masterBuilderEntry : masterMap.entrySet()) { try { //获取通道map中的key String stationId = masterBuilderEntry.getKey(); //风速层高 HashSet wsHeightSet = new HashSet<>(); //风向层高 HashSet wdHeightSet = new HashSet<>(); //根据通道key获取ip端口号等数据 List tunnelist = tunnelInfoList.stream().filter(t -> t.getStationId().equals(stationId)).collect(Collectors.toList()); TunnelInfo tunnelInfo; if (!tunnelist.isEmpty()) { tunnelInfo = tunnelist.get(0); } else { //如果此通道不存在 关闭通道 this.stop(stationId); continue; } //根据通道key获取点表数据 List pointAttributeList = attributeList.stream().filter(a -> a.getStationId().equals(stationId)).collect(Collectors.toList()); //拿到所有点位 List points = pointAttributeList.stream().map(PointAttribute::getPoint).collect(Collectors.toList()); Map pointTypeMap = new HashMap<>(); for (Integer point : points) { //点位以及点位类型存入map pointTypeMap.put(point, ModbusDataTypeEnum.valueOf(tunnelInfo.getDataFormat())); } //获取连接 ModbusTcpMasterBuilder master = masterBuilderEntry.getValue(); //获取点位信息map Map stringStringBuilderMap = savePoints(pointAttributeList, wsHeightSet, wdHeightSet); WindTowerInfo windTowerInfo = new WindTowerInfo(); //判断是否存在此塔 boolean emptyOfEntity = windTowerInfoService.lambdaQuery().eq(WindTowerInfo::getEquipmentNo, tunnelInfo.getEquipmentNo()).list().isEmpty(); log.info("处理塔{} 存在:{}", tunnelInfo.getEquipmentNo(), emptyOfEntity); if (emptyOfEntity) { StringBuilder wsHeightStr = new StringBuilder(); for (Integer height : wsHeightSet) { wsHeightStr = wsHeightStr.append(height + ","); } String wsHeight = null; if (wsHeightStr.length() > 0) { wsHeight = wsHeightStr.substring(0, wsHeightStr.length() - 1); } StringBuilder wdHeightStr = new StringBuilder(); for (Integer height : wsHeightSet) { wdHeightStr = wdHeightStr.append(height + ","); } String wdHeight = null; if (wdHeightStr.length() > 0) { wdHeight = wdHeightStr.substring(0, wdHeightStr.length() - 1); } //存储测风塔信息 windTowerInfo.setEquipmentNo(tunnelist.get(0).getEquipmentNo()); windTowerInfo.setName(tunnelist.get(0).getStationId()); windTowerInfo.setHeights(wsHeight); windTowerInfo.setWdHeights(wdHeight); windTowerInfo.setType("station"); log.info("处理数据入库结果:{}", JSONUtil.parse(windTowerInfo)); windTowerInfoService.save(windTowerInfo); } // 例如: key:wsAve val:数据 HashMap pointMap = new HashMap<>(); //分解成Obj4RequestCoil List obj4RequestRegisters = null; try { obj4RequestRegisters = ModbusRequestDataUtils.splitModbusRequest(pointTypeMap, 1, FunctionCode.READ_HOLDING_REGISTERS); for (Obj4RequestRegister obj4RequestRegister : obj4RequestRegisters) { Map registerData = ModbusRequestDataUtils.getRegisterData(master, obj4RequestRegister); if (!registerData.isEmpty()) { for (Map.Entry typeEntry : registerData.entrySet()) { for (Map.Entry entry : stringStringBuilderMap.entrySet()) { if (typeEntry.getKey().equals(entry.getValue())) { log.info("点位:{}-----数值:{}", typeEntry.getKey(), ((NumericModbusData) typeEntry.getValue()).getValue()); //点位一致 置换value pointMap.put(entry.getKey(), ((NumericModbusData) typeEntry.getValue()).getValue()); } } } } } } catch (ModbusException e) { log.info(e.getMsg()); throw new RuntimeException(e); } wsHeightSet.addAll(wdHeightSet); //List> mapList = new CheckDataRecode().checkValue((List>) pointMap, "station"); //log.info("校验前数据:{} ;;; 校验后数据:{}",JSONUtil.parse(pointMap),JSONUtil.parse(mapList)); windTowerDataParentTableService.saveDataForTunnel(pointMap, wsHeightSet, tunnelInfo.getEquipmentNo(), time); }catch (Exception e){ log.error("modbus 接入测风塔:{} 的数据错误:{}",masterBuilderEntry.getKey() ,e); } } } //获取点表所对应数据 public Map savePoints(List channelPoints, HashSet wsHeightSet, HashSet wdHeightSet) { //如果数据库没有此测风塔 读取点表信息增加塔表 HashMap dataMap = new HashMap<>(); for (PointAttribute channelPoint : channelPoints) { if(channelPoint.getUnit() == null){ channelPoint.setUnit(""); } if ((channelPoint.getUnit().equals("y") || channelPoint.getUnit().equals("Y")) && channelPoint.getMeaning().contains("年")) { dataMap.put("yyyy", channelPoint.getPoint()); } else if ((channelPoint.getUnit().equals("y") || channelPoint.getMeaning().equals("Y")) && channelPoint.getMeaning().contains("月")) { dataMap.put("MM", channelPoint.getPoint()); } else if (channelPoint.getUnit().equals("d") && channelPoint.getMeaning().contains("日")) { dataMap.put("dd", channelPoint.getPoint()); } else if (channelPoint.getUnit().equals("h") && channelPoint.getMeaning().contains("时")) { dataMap.put("HH", channelPoint.getPoint()); } else if (channelPoint.getUnit().equals("m") && channelPoint.getMeaning().contains("分")) { dataMap.put("mm", channelPoint.getPoint()); } else if (channelPoint.getMeaning().contains("纬")) { dataMap.put("latitude", channelPoint.getPoint()); } else if (channelPoint.getMeaning().contains("经")) { dataMap.put("longitude", channelPoint.getPoint()); } else if (channelPoint.getMeaning().contains("海拔")) { dataMap.put("elevation", channelPoint.getPoint()); } else if (channelPoint.getMeaning().contains("电压")) { String status = status(channelPoint.getMeaning()); dataMap.put("v" + status, channelPoint.getPoint()); } else { String dataType = dataType(channelPoint.getMeaning(), wsHeightSet, wdHeightSet); if (!dataType.equals("")) { dataMap.put(dataType, channelPoint.getPoint()); } } } return dataMap; } public String dataType(String str, HashSet wsHeightSet, HashSet wdHeightSet) { String typeStr = ""; int index; if (!str.contains("米") && !str.contains("m")) { return ""; } if (str.contains("米")) { index = str.indexOf("米"); } else { index = str.indexOf("m"); } //截取风速 String dataStr = str.substring(0, index); Integer height = Integer.valueOf(CalculationUtil.getNumberFromString(dataStr)); String dataType = ""; typeStr = status(str); if (str.contains("风速") && !str.contains("风向")) { dataType = "ws"; wsHeightSet.add(height); } else if (str.contains("风向")) { dataType = "wd"; wdHeightSet.add(height); } else if (str.contains("湿度")) { dataType = "rh"; return dataType + typeStr; } else if (str.contains("温度")) { dataType = "t"; return dataType + typeStr; } else if (str.contains("气压") || str.contains("压力")) { dataType = "pa"; return dataType + typeStr; } return dataType + typeStr + height; } public String status(String str) { String typeStr = ""; if (str.contains("平均值") && !str.contains("G")) { typeStr = "Ave"; } else if (str.contains("方差") || str.contains("标准偏差") || str.contains("标准差")) { typeStr = "Sta"; } else if (str.contains("最大值") && !str.contains("极大")) { typeStr = "Max"; } else if (str.contains("最小值")) { typeStr = "Min"; } else if (str.contains("瞬时") ||str.contains("实时")) { typeStr = "Inst"; } else if (str.contains("极大")) { typeStr = "Great"; } else if (str.contains("Gust") || str.contains("GUST")) { typeStr = "Gust"; } else if (str.contains("风速") && str.contains("风向")) { typeStr = "OnWs"; } else if (str.contains("实时")) { typeStr = "Now"; } return typeStr; } }