ModbusReciveJob.java 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. package com.jiayue.biz.job;
  2. import cn.hutool.core.thread.ThreadFactoryBuilder;
  3. import cn.hutool.json.JSONUtil;
  4. import com.jiayue.biz.domain.PointAttribute;
  5. import com.jiayue.biz.domain.TunnelInfo;
  6. import com.jiayue.biz.domain.WindTowerInfo;
  7. import com.jiayue.biz.service.PointAttributeService;
  8. import com.jiayue.biz.service.TunnelInfoService;
  9. import com.jiayue.biz.service.WindTowerInfoService;
  10. import com.jiayue.biz.service.impl.CheckDataRecode;
  11. import com.jiayue.biz.service.impl.WindTowerDataParentTableServiceImpl;
  12. import com.jiayue.biz.util.CalculationUtil;
  13. import com.jiayue.common.utils.DateUtil;
  14. import lombok.extern.slf4j.Slf4j;
  15. import org.springframework.beans.factory.annotation.Autowired;
  16. import org.springframework.context.annotation.Bean;
  17. import org.springframework.stereotype.Service;
  18. import wei.yigulu.modbus.domain.FunctionCode;
  19. import wei.yigulu.modbus.domain.Obj4RequestRegister;
  20. import wei.yigulu.modbus.domain.datatype.IModbusDataType;
  21. import wei.yigulu.modbus.domain.datatype.ModbusDataTypeEnum;
  22. import wei.yigulu.modbus.domain.datatype.NumericModbusData;
  23. import wei.yigulu.modbus.exceptiom.ModbusException;
  24. import wei.yigulu.modbus.netty.ModbusTcpMasterBuilder;
  25. import wei.yigulu.modbus.utils.ModbusRequestDataUtils;
  26. import java.math.BigDecimal;
  27. import java.util.HashMap;
  28. import java.util.HashSet;
  29. import java.util.List;
  30. import java.util.Map;
  31. import java.util.concurrent.ExecutorService;
  32. import java.util.concurrent.SynchronousQueue;
  33. import java.util.concurrent.ThreadPoolExecutor;
  34. import java.util.concurrent.TimeUnit;
  35. import java.util.stream.Collectors;
  36. @Service
  37. @Slf4j
  38. public class ModbusReciveJob {
  39. @Autowired
  40. TunnelInfoService tunnelInfoService;
  41. @Autowired
  42. PointAttributeService pointAttributeService;
  43. @Autowired
  44. WindTowerDataParentTableServiceImpl windTowerDataParentTableService;
  45. @Autowired
  46. WindTowerInfoService windTowerInfoService;
  47. private final ExecutorService calculatorThread = new ThreadPoolExecutor(15, 1000,
  48. 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactoryBuilder().setNamePrefix("ModbusReciveTaskThread-").build());
  49. private HashMap<String, ModbusTcpMasterBuilder> masterMap = new HashMap<>();
  50. @Bean
  51. public void timingTunnel() {
  52. List<TunnelInfo> tunnelInfoList = tunnelInfoService.list();
  53. for (TunnelInfo tunnelInfo : tunnelInfoList) {
  54. ModbusTcpMasterBuilder master = start(tunnelInfo);
  55. masterMap.put(tunnelInfo.getStationId(), master);
  56. }
  57. }
  58. //创建线程
  59. public ModbusTcpMasterBuilder start(TunnelInfo tunnelInfo) {
  60. //创建通道后 对通道map 进行 采集数据
  61. //获取场站id 形成map
  62. ModbusTcpMasterBuilder master = new ModbusTcpMasterBuilder(tunnelInfo.getIp(), tunnelInfo.getPort());
  63. master.getConfigInfoMap().put("stationId", tunnelInfo.getStationId());
  64. //create 会阻塞线程
  65. calculatorThread.execute(master::create);
  66. master.isConnected();
  67. return master;
  68. }
  69. //关闭通道移除连接
  70. public void stop(String stationId) {
  71. if (masterMap.get(stationId).isConnected()) {
  72. masterMap.get(stationId).stop();
  73. masterMap.remove(stationId);
  74. }
  75. }
  76. public void tunnel() {
  77. List<TunnelInfo> tunnelInfoList = tunnelInfoService.list();
  78. log.info("接入tunnelInfoList通道:{}", JSONUtil.parse(tunnelInfoList));
  79. List<PointAttribute> attributeList = pointAttributeService.list();
  80. String time = DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:00");
  81. log.info("接入modbus时间:{}",time);
  82. //TODO 现场测试筛选数据是否能正常入库
  83. // List<HashMap<String, Object>> mapList = new ArrayList<>();
  84. for (Map.Entry<String, ModbusTcpMasterBuilder> masterBuilderEntry : masterMap.entrySet()) {
  85. try {
  86. //获取通道map中的key
  87. String stationId = masterBuilderEntry.getKey();
  88. //风速层高
  89. HashSet<Integer> wsHeightSet = new HashSet<>();
  90. //风向层高
  91. HashSet<Integer> wdHeightSet = new HashSet<>();
  92. //根据通道key获取ip端口号等数据
  93. List<TunnelInfo> tunnelist = tunnelInfoList.stream().filter(t -> t.getStationId().equals(stationId)).collect(Collectors.toList());
  94. TunnelInfo tunnelInfo;
  95. if (!tunnelist.isEmpty()) {
  96. tunnelInfo = tunnelist.get(0);
  97. } else {
  98. //如果此通道不存在 关闭通道
  99. this.stop(stationId);
  100. continue;
  101. }
  102. //根据通道key获取点表数据
  103. List<PointAttribute> pointAttributeList = attributeList.stream().filter(a -> a.getStationId().equals(stationId)).collect(Collectors.toList());
  104. //拿到所有点位
  105. List<Integer> points = pointAttributeList.stream().map(PointAttribute::getPoint).collect(Collectors.toList());
  106. Map<Integer, ModbusDataTypeEnum> pointTypeMap = new HashMap<>();
  107. for (Integer point : points) {
  108. //点位以及点位类型存入map
  109. pointTypeMap.put(point, ModbusDataTypeEnum.valueOf(tunnelInfo.getDataFormat()));
  110. }
  111. //获取连接
  112. ModbusTcpMasterBuilder master = masterBuilderEntry.getValue();
  113. //获取点位信息map
  114. Map<String, Integer> stringStringBuilderMap = savePoints(pointAttributeList, wsHeightSet, wdHeightSet);
  115. WindTowerInfo windTowerInfo = new WindTowerInfo();
  116. //判断是否存在此塔
  117. boolean emptyOfEntity = windTowerInfoService.lambdaQuery().eq(WindTowerInfo::getEquipmentNo, tunnelInfo.getEquipmentNo()).list().isEmpty();
  118. log.info("处理塔{} 存在:{}", tunnelInfo.getEquipmentNo(), emptyOfEntity);
  119. if (emptyOfEntity) {
  120. StringBuilder wsHeightStr = new StringBuilder();
  121. for (Integer height : wsHeightSet) {
  122. wsHeightStr = wsHeightStr.append(height + ",");
  123. }
  124. String wsHeight = null;
  125. if (wsHeightStr.length() > 0) {
  126. wsHeight = wsHeightStr.substring(0, wsHeightStr.length() - 1);
  127. }
  128. StringBuilder wdHeightStr = new StringBuilder();
  129. for (Integer height : wsHeightSet) {
  130. wdHeightStr = wdHeightStr.append(height + ",");
  131. }
  132. String wdHeight = null;
  133. if (wdHeightStr.length() > 0) {
  134. wdHeight = wdHeightStr.substring(0, wdHeightStr.length() - 1);
  135. }
  136. //存储测风塔信息
  137. windTowerInfo.setEquipmentNo(tunnelist.get(0).getEquipmentNo());
  138. windTowerInfo.setName(tunnelist.get(0).getStationId());
  139. windTowerInfo.setHeights(wsHeight);
  140. windTowerInfo.setWdHeights(wdHeight);
  141. windTowerInfo.setType("station");
  142. log.info("处理数据入库结果:{}", JSONUtil.parse(windTowerInfo));
  143. windTowerInfoService.save(windTowerInfo);
  144. }
  145. // 例如: key:wsAve val:数据
  146. HashMap<String, BigDecimal> pointMap = new HashMap<>();
  147. //分解成Obj4RequestCoil
  148. List<Obj4RequestRegister> obj4RequestRegisters = null;
  149. try {
  150. obj4RequestRegisters = ModbusRequestDataUtils.splitModbusRequest(pointTypeMap, 1, FunctionCode.READ_HOLDING_REGISTERS);
  151. for (Obj4RequestRegister obj4RequestRegister : obj4RequestRegisters) {
  152. Map<Integer, IModbusDataType> registerData = ModbusRequestDataUtils.getRegisterData(master, obj4RequestRegister);
  153. if (!registerData.isEmpty()) {
  154. for (Map.Entry<Integer, IModbusDataType> typeEntry : registerData.entrySet()) {
  155. for (Map.Entry<String, Integer> entry : stringStringBuilderMap.entrySet()) {
  156. if (typeEntry.getKey().equals(entry.getValue())) {
  157. log.info("点位:{}-----数值:{}", typeEntry.getKey(), ((NumericModbusData) typeEntry.getValue()).getValue());
  158. //点位一致 置换value
  159. pointMap.put(entry.getKey(), ((NumericModbusData) typeEntry.getValue()).getValue());
  160. }
  161. }
  162. }
  163. }
  164. }
  165. } catch (ModbusException e) {
  166. log.info(e.getMsg());
  167. throw new RuntimeException(e);
  168. }
  169. wsHeightSet.addAll(wdHeightSet);
  170. //List<Map<String, Object>> mapList = new CheckDataRecode().checkValue((List<Map<String, Object>>) pointMap, "station");
  171. //log.info("校验前数据:{} ;;; 校验后数据:{}",JSONUtil.parse(pointMap),JSONUtil.parse(mapList));
  172. windTowerDataParentTableService.saveDataForTunnel(pointMap, wsHeightSet, tunnelInfo.getEquipmentNo(), time);
  173. }catch (Exception e){
  174. log.error("modbus 接入测风塔:{} 的数据错误:{}",masterBuilderEntry.getKey() ,e);
  175. }
  176. }
  177. }
  178. //获取点表所对应数据
  179. public Map<String, Integer> savePoints(List<PointAttribute> channelPoints, HashSet<Integer> wsHeightSet, HashSet<Integer> wdHeightSet) {
  180. //如果数据库没有此测风塔 读取点表信息增加塔表
  181. HashMap<String, Integer> dataMap = new HashMap<>();
  182. for (PointAttribute channelPoint : channelPoints) {
  183. if(channelPoint.getUnit() == null){
  184. channelPoint.setUnit("");
  185. }
  186. if ((channelPoint.getUnit().equals("y") || channelPoint.getUnit().equals("Y")) && channelPoint.getMeaning().contains("年")) {
  187. dataMap.put("yyyy", channelPoint.getPoint());
  188. } else if ((channelPoint.getUnit().equals("y") || channelPoint.getMeaning().equals("Y")) && channelPoint.getMeaning().contains("月")) {
  189. dataMap.put("MM", channelPoint.getPoint());
  190. } else if (channelPoint.getUnit().equals("d") && channelPoint.getMeaning().contains("日")) {
  191. dataMap.put("dd", channelPoint.getPoint());
  192. } else if (channelPoint.getUnit().equals("h") && channelPoint.getMeaning().contains("时")) {
  193. dataMap.put("HH", channelPoint.getPoint());
  194. } else if (channelPoint.getUnit().equals("m") && channelPoint.getMeaning().contains("分")) {
  195. dataMap.put("mm", channelPoint.getPoint());
  196. } else if (channelPoint.getMeaning().contains("纬")) {
  197. dataMap.put("latitude", channelPoint.getPoint());
  198. } else if (channelPoint.getMeaning().contains("经")) {
  199. dataMap.put("longitude", channelPoint.getPoint());
  200. } else if (channelPoint.getMeaning().contains("海拔")) {
  201. dataMap.put("elevation", channelPoint.getPoint());
  202. } else if (channelPoint.getMeaning().contains("电压")) {
  203. String status = status(channelPoint.getMeaning());
  204. dataMap.put("v" + status, channelPoint.getPoint());
  205. } else {
  206. String dataType = dataType(channelPoint.getMeaning(), wsHeightSet, wdHeightSet);
  207. if (!dataType.equals("")) {
  208. dataMap.put(dataType, channelPoint.getPoint());
  209. }
  210. }
  211. }
  212. return dataMap;
  213. }
  214. public String dataType(String str, HashSet<Integer> wsHeightSet, HashSet<Integer> wdHeightSet) {
  215. String typeStr = "";
  216. int index;
  217. if (!str.contains("米") && !str.contains("m")) {
  218. return "";
  219. }
  220. if (str.contains("米")) {
  221. index = str.indexOf("米");
  222. } else {
  223. index = str.indexOf("m");
  224. }
  225. //截取风速
  226. String dataStr = str.substring(0, index);
  227. Integer height = Integer.valueOf(CalculationUtil.getNumberFromString(dataStr));
  228. String dataType = "";
  229. typeStr = status(str);
  230. if (str.contains("风速") && !str.contains("风向")) {
  231. dataType = "ws";
  232. wsHeightSet.add(height);
  233. } else if (str.contains("风向")) {
  234. dataType = "wd";
  235. wdHeightSet.add(height);
  236. } else if (str.contains("湿度")) {
  237. dataType = "rh";
  238. return dataType + typeStr;
  239. } else if (str.contains("温度")) {
  240. dataType = "t";
  241. return dataType + typeStr;
  242. } else if (str.contains("气压") || str.contains("压力")) {
  243. dataType = "pa";
  244. return dataType + typeStr;
  245. }
  246. return dataType + typeStr + height;
  247. }
  248. public String status(String str) {
  249. String typeStr = "";
  250. if (str.contains("平均值") && !str.contains("G")) {
  251. typeStr = "Ave";
  252. } else if (str.contains("方差") || str.contains("标准偏差") || str.contains("标准差")) {
  253. typeStr = "Sta";
  254. } else if (str.contains("最大值") && !str.contains("极大")) {
  255. typeStr = "Max";
  256. } else if (str.contains("最小值")) {
  257. typeStr = "Min";
  258. } else if (str.contains("瞬时") ||str.contains("实时")) {
  259. typeStr = "Inst";
  260. } else if (str.contains("极大")) {
  261. typeStr = "Great";
  262. } else if (str.contains("Gust") || str.contains("GUST")) {
  263. typeStr = "Gust";
  264. } else if (str.contains("风速") && str.contains("风向")) {
  265. typeStr = "OnWs";
  266. } else if (str.contains("实时")) {
  267. typeStr = "Now";
  268. }
  269. return typeStr;
  270. }
  271. }