|
@@ -0,0 +1,289 @@
|
|
|
+package com.jiayue.biz.job;
|
|
|
+
|
|
|
+import cn.hutool.core.thread.ThreadFactoryBuilder;
|
|
|
+import com.jiayue.biz.domain.PointAttribute;
|
|
|
+import com.jiayue.biz.domain.TunnelInfo;
|
|
|
+import com.jiayue.biz.domain.WindTowerInfo;
|
|
|
+import com.jiayue.biz.service.PointAttributeService;
|
|
|
+import com.jiayue.biz.service.TunnelInfoService;
|
|
|
+import com.jiayue.biz.service.WindTowerInfoService;
|
|
|
+import com.jiayue.biz.service.impl.CheckDataRecode;
|
|
|
+import com.jiayue.biz.service.impl.WindTowerDataParentTableServiceImpl;
|
|
|
+import com.jiayue.biz.util.CalculationUtil;
|
|
|
+import com.jiayue.common.utils.DateUtil;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.context.annotation.Bean;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+import wei.yigulu.modbus.domain.FunctionCode;
|
|
|
+import wei.yigulu.modbus.domain.Obj4RequestRegister;
|
|
|
+import wei.yigulu.modbus.domain.datatype.IModbusDataType;
|
|
|
+import wei.yigulu.modbus.domain.datatype.ModbusDataTypeEnum;
|
|
|
+import wei.yigulu.modbus.domain.datatype.NumericModbusData;
|
|
|
+import wei.yigulu.modbus.exceptiom.ModbusException;
|
|
|
+import wei.yigulu.modbus.netty.ModbusTcpMasterBuilder;
|
|
|
+import wei.yigulu.modbus.utils.ModbusRequestDataUtils;
|
|
|
+
|
|
|
+import java.math.BigDecimal;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
+import java.util.concurrent.SynchronousQueue;
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+@Service
|
|
|
+@Slf4j
|
|
|
+public class ModbusReciveJob {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ TunnelInfoService tunnelInfoService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ PointAttributeService pointAttributeService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ WindTowerDataParentTableServiceImpl windTowerDataParentTableService;
|
|
|
+ @Autowired
|
|
|
+ WindTowerInfoService windTowerInfoService;
|
|
|
+
|
|
|
+
|
|
|
+ private final ExecutorService calculatorThread = new ThreadPoolExecutor(15, 1000,
|
|
|
+ 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactoryBuilder().setNamePrefix("ModbusReciveTaskThread-").build());
|
|
|
+
|
|
|
+
|
|
|
+ private HashMap<String, ModbusTcpMasterBuilder> masterMap = new HashMap<>();
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ public void timingTunnel() {
|
|
|
+ List<TunnelInfo> tunnelInfoList = tunnelInfoService.list();
|
|
|
+ for (TunnelInfo tunnelInfo : tunnelInfoList) {
|
|
|
+ ModbusTcpMasterBuilder master = start(tunnelInfo);
|
|
|
+ masterMap.put(tunnelInfo.getStationId(), master);
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //创建线程
|
|
|
+ public ModbusTcpMasterBuilder start(TunnelInfo tunnelInfo) {
|
|
|
+ //创建通道后 对通道map 进行 采集数据
|
|
|
+ //获取场站id 形成map
|
|
|
+ ModbusTcpMasterBuilder master = new ModbusTcpMasterBuilder(tunnelInfo.getIp(), tunnelInfo.getPort());
|
|
|
+ master.getConfigInfoMap().put("stationId", tunnelInfo.getStationId());
|
|
|
+ //create 会阻塞线程
|
|
|
+ calculatorThread.execute(master::create);
|
|
|
+ master.isConnected();
|
|
|
+ return master;
|
|
|
+ }
|
|
|
+
|
|
|
+ //关闭通道移除连接
|
|
|
+ public void stop(String stationId) {
|
|
|
+ if (masterMap.get(stationId).isConnected()) {
|
|
|
+ masterMap.get(stationId).stop();
|
|
|
+ masterMap.remove(stationId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ public void tunnel() {
|
|
|
+ List<TunnelInfo> tunnelInfoList = tunnelInfoService.list();
|
|
|
+ List<PointAttribute> attributeList = pointAttributeService.list();
|
|
|
+ String time = DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm");
|
|
|
+ //TODO 现场测试筛选数据是否能正常入库
|
|
|
+// List<HashMap<String, Object>> mapList = new ArrayList<>();
|
|
|
+ for (Map.Entry<String, ModbusTcpMasterBuilder> masterBuilderEntry : masterMap.entrySet()) {
|
|
|
+ //获取通道map中的key
|
|
|
+ String stationId = masterBuilderEntry.getKey();
|
|
|
+ //风速层高
|
|
|
+ HashSet<Integer> wsHeightSet = new HashSet<>();
|
|
|
+ //风向层高
|
|
|
+ HashSet<Integer> wdHeightSet = new HashSet<>();
|
|
|
+ //根据通道key获取ip端口号等数据
|
|
|
+ List<TunnelInfo> tunnelist = tunnelInfoList.stream().filter(t -> t.getStationId().equals(stationId)).collect(Collectors.toList());
|
|
|
+ TunnelInfo tunnelInfo;
|
|
|
+ if (!tunnelist.isEmpty()) {
|
|
|
+ tunnelInfo = tunnelist.get(0);
|
|
|
+ } else {
|
|
|
+ //如果此通道不存在 关闭通道
|
|
|
+ this.stop(stationId);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ //根据通道key获取点表数据
|
|
|
+ List<PointAttribute> pointAttributeList = attributeList.stream().filter(a -> a.getStationId().equals(stationId)).collect(Collectors.toList());
|
|
|
+ //拿到所有点位
|
|
|
+ List<Integer> points = pointAttributeList.stream().map(PointAttribute::getPoint).collect(Collectors.toList());
|
|
|
+ Map<Integer, ModbusDataTypeEnum> pointTypeMap = new HashMap<>();
|
|
|
+ for (Integer point : points) {
|
|
|
+ //点位以及点位类型存入map
|
|
|
+ pointTypeMap.put(point, ModbusDataTypeEnum.valueOf(tunnelInfo.getDataFormat()));
|
|
|
+ }
|
|
|
+
|
|
|
+ //获取连接
|
|
|
+ ModbusTcpMasterBuilder master = masterBuilderEntry.getValue();
|
|
|
+ //获取点位信息map
|
|
|
+ Map<String, Integer> stringStringBuilderMap = savePoints(pointAttributeList, wsHeightSet, wdHeightSet);
|
|
|
+ WindTowerInfo windTowerInfo = new WindTowerInfo();
|
|
|
+ //判断是否存在此塔
|
|
|
+ boolean emptyOfEntity = windTowerInfoService.lambdaQuery().eq(WindTowerInfo::getEquipmentNo, tunnelInfo.getEquipmentNo()).list().isEmpty();
|
|
|
+ if (emptyOfEntity) {
|
|
|
+ StringBuilder wsHeightStr = new StringBuilder();
|
|
|
+ for (Integer height : wsHeightSet) {
|
|
|
+ wsHeightStr = wsHeightStr.append(height + ",");
|
|
|
+ }
|
|
|
+ String wsHeight = null;
|
|
|
+ if (wsHeightStr.length() > 0) {
|
|
|
+ wsHeight = wsHeightStr.substring(0, wsHeightStr.length() - 1);
|
|
|
+ }
|
|
|
+
|
|
|
+ StringBuilder wdHeightStr = new StringBuilder();
|
|
|
+ for (Integer height : wsHeightSet) {
|
|
|
+ wdHeightStr = wdHeightStr.append(height + ",");
|
|
|
+ }
|
|
|
+ String wdHeight = null;
|
|
|
+ if (wdHeightStr.length() > 0) {
|
|
|
+ wdHeight = wdHeightStr.substring(0, wdHeightStr.length() - 1);
|
|
|
+ }
|
|
|
+ //存储测风塔信息
|
|
|
+ windTowerInfo.setEquipmentNo(tunnelist.get(0).getEquipmentNo());
|
|
|
+ windTowerInfo.setName(tunnelist.get(0).getStationId());
|
|
|
+ windTowerInfo.setHeights(wsHeight);
|
|
|
+ windTowerInfo.setWdHeights(wdHeight);
|
|
|
+ windTowerInfo.setType("station");
|
|
|
+ windTowerInfoService.save(windTowerInfo);
|
|
|
+ }
|
|
|
+ // 例如: key:wsAve val:数据
|
|
|
+ HashMap<String, BigDecimal> pointMap = new HashMap<>();
|
|
|
+ //分解成Obj4RequestCoil
|
|
|
+ List<Obj4RequestRegister> obj4RequestRegisters = null;
|
|
|
+ try {
|
|
|
+ obj4RequestRegisters = ModbusRequestDataUtils.splitModbusRequest(pointTypeMap, 1, FunctionCode.READ_HOLDING_REGISTERS);
|
|
|
+ for (Obj4RequestRegister obj4RequestRegister : obj4RequestRegisters) {
|
|
|
+ Map<Integer, IModbusDataType> registerData = ModbusRequestDataUtils.getRegisterData(master, obj4RequestRegister);
|
|
|
+ if(!registerData.isEmpty()){
|
|
|
+ for (Map.Entry<Integer, IModbusDataType> typeEntry : registerData.entrySet()) {
|
|
|
+ for (Map.Entry<String, Integer> entry : stringStringBuilderMap.entrySet()) {
|
|
|
+ if (typeEntry.getKey().equals(entry.getValue())) {
|
|
|
+ log.info("点位:{}-----数值:{}",typeEntry.getKey(),((NumericModbusData) typeEntry.getValue()).getValue());
|
|
|
+ //点位一致 置换value
|
|
|
+ pointMap.put(entry.getKey(), ((NumericModbusData) typeEntry.getValue()).getValue());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ } catch (ModbusException e) {
|
|
|
+ log.info(e.getMsg());
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ wsHeightSet.addAll(wdHeightSet);
|
|
|
+
|
|
|
+ List<Map<String, Object>> mapList = new CheckDataRecode().checkValue((List<Map<String, Object>>) pointMap, "station");
|
|
|
+
|
|
|
+ windTowerDataParentTableService.saveDataForTunnel(pointMap, wsHeightSet,tunnelInfo.getEquipmentNo(),time);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ //获取点表所对应数据
|
|
|
+ public Map<String, Integer> savePoints(List<PointAttribute> channelPoints, HashSet<Integer> wsHeightSet, HashSet<Integer> wdHeightSet) {
|
|
|
+ //如果数据库没有此测风塔 读取点表信息增加塔表
|
|
|
+ HashMap<String, Integer> dataMap = new HashMap<>();
|
|
|
+ for (PointAttribute channelPoint : channelPoints) {
|
|
|
+ if(channelPoint.getUnit() == null){
|
|
|
+ channelPoint.setUnit("");
|
|
|
+ }
|
|
|
+ if ((channelPoint.getUnit().equals("y") || channelPoint.getUnit().equals("Y")) && channelPoint.getMeaning().contains("年")) {
|
|
|
+ dataMap.put("yyyy", channelPoint.getPoint());
|
|
|
+ } else if ((channelPoint.getUnit().equals("y") || channelPoint.getMeaning().equals("Y")) && channelPoint.getMeaning().contains("月")) {
|
|
|
+ dataMap.put("MM", channelPoint.getPoint());
|
|
|
+ } else if (channelPoint.getUnit().equals("d") && channelPoint.getMeaning().contains("日")) {
|
|
|
+ dataMap.put("dd", channelPoint.getPoint());
|
|
|
+ } else if (channelPoint.getUnit().equals("h") && channelPoint.getMeaning().contains("时")) {
|
|
|
+ dataMap.put("HH", channelPoint.getPoint());
|
|
|
+ } else if (channelPoint.getUnit().equals("m") && channelPoint.getMeaning().contains("分")) {
|
|
|
+ dataMap.put("mm", channelPoint.getPoint());
|
|
|
+ } else if (channelPoint.getMeaning().contains("纬")) {
|
|
|
+ dataMap.put("latitude", channelPoint.getPoint());
|
|
|
+ } else if (channelPoint.getMeaning().contains("经")) {
|
|
|
+ dataMap.put("longitude", channelPoint.getPoint());
|
|
|
+ } else if (channelPoint.getMeaning().contains("海拔")) {
|
|
|
+ dataMap.put("elevation", channelPoint.getPoint());
|
|
|
+ } else if (channelPoint.getMeaning().contains("电压")) {
|
|
|
+ String status = status(channelPoint.getMeaning());
|
|
|
+ dataMap.put("v" + status, channelPoint.getPoint());
|
|
|
+ } else {
|
|
|
+ String dataType = dataType(channelPoint.getMeaning(), wsHeightSet, wdHeightSet);
|
|
|
+ if (!dataType.equals("")) {
|
|
|
+ dataMap.put(dataType, channelPoint.getPoint());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ return dataMap;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public String dataType(String str, HashSet<Integer> wsHeightSet, HashSet<Integer> wdHeightSet) {
|
|
|
+ String typeStr = "";
|
|
|
+ int index;
|
|
|
+ if (!str.contains("米") && !str.contains("m")) {
|
|
|
+ return "";
|
|
|
+ }
|
|
|
+ if (str.contains("米")) {
|
|
|
+ index = str.indexOf("米");
|
|
|
+ } else {
|
|
|
+ index = str.indexOf("m");
|
|
|
+ }
|
|
|
+ //截取风速
|
|
|
+ String dataStr = str.substring(0, index);
|
|
|
+ Integer height = Integer.valueOf(CalculationUtil.getNumberFromString(dataStr));
|
|
|
+ String dataType = "";
|
|
|
+ typeStr = status(str);
|
|
|
+
|
|
|
+ if (str.contains("风速") && !str.contains("风向")) {
|
|
|
+ dataType = "ws";
|
|
|
+ wsHeightSet.add(height);
|
|
|
+ } else if (str.contains("风向")) {
|
|
|
+ dataType = "wd";
|
|
|
+ wdHeightSet.add(height);
|
|
|
+ } else if (str.contains("湿度")) {
|
|
|
+ dataType = "rh";
|
|
|
+ return dataType + typeStr;
|
|
|
+ } else if (str.contains("温度")) {
|
|
|
+ dataType = "t";
|
|
|
+ return dataType + typeStr;
|
|
|
+ } else if (str.contains("气压") || str.contains("压力")) {
|
|
|
+ dataType = "pa";
|
|
|
+ return dataType + typeStr;
|
|
|
+ }
|
|
|
+ return dataType + typeStr + height;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String status(String str) {
|
|
|
+ String typeStr = "";
|
|
|
+ if (str.contains("平均值") && !str.contains("G")) {
|
|
|
+ typeStr = "Ave";
|
|
|
+ } else if (str.contains("方差") || str.contains("标准偏差") || str.contains("标准差")) {
|
|
|
+ typeStr = "Sta";
|
|
|
+ } else if (str.contains("最大值") && !str.contains("极大")) {
|
|
|
+ typeStr = "Max";
|
|
|
+ } else if (str.contains("最小值")) {
|
|
|
+ typeStr = "Min";
|
|
|
+ } else if (str.contains("瞬时")) {
|
|
|
+ typeStr = "Inst";
|
|
|
+ } else if (str.contains("极大")) {
|
|
|
+ typeStr = "Great";
|
|
|
+ } else if (str.contains("Gust") || str.contains("GUST")) {
|
|
|
+ typeStr = "Gust";
|
|
|
+ } else if (str.contains("风速") && str.contains("风向")) {
|
|
|
+ typeStr = "OnWs";
|
|
|
+ } else if (str.contains("实时")) {
|
|
|
+ typeStr = "Now";
|
|
|
+ }
|
|
|
+ return typeStr;
|
|
|
+ }
|
|
|
+}
|