ReceiveDataService.java 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. package com.jiayue.ipfcst.console.service;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONArray;
  4. import com.alibaba.fastjson.JSONObject;
  5. import com.alibaba.fastjson.TypeReference;
  6. import com.jiayue.ipfcst.common.core.util.DateTimeUtil;
  7. import com.jiayue.ipfcst.common.data.constant.enums.EquipmentTypeEnum;
  8. import com.jiayue.ipfcst.common.data.entity.*;
  9. import com.jiayue.ipfcst.common.data.repository.*;
  10. import com.jiayue.ipfcst.console.util.RedisUtils;
  11. import org.apache.http.HttpEntity;
  12. import org.apache.http.client.config.RequestConfig;
  13. import org.apache.http.client.methods.CloseableHttpResponse;
  14. import org.apache.http.client.methods.HttpPost;
  15. import org.apache.http.entity.StringEntity;
  16. import org.apache.http.impl.client.CloseableHttpClient;
  17. import org.apache.http.impl.client.HttpClientBuilder;
  18. import org.apache.http.util.EntityUtils;
  19. import org.springframework.beans.factory.annotation.Value;
  20. import lombok.extern.slf4j.Slf4j;
  21. import org.springframework.beans.factory.annotation.Autowired;
  22. import org.springframework.stereotype.Service;
  23. import java.io.IOException;
  24. import java.math.BigDecimal;
  25. import java.nio.charset.StandardCharsets;
  26. import java.text.SimpleDateFormat;
  27. import java.util.*;
  28. /**
  29. * 接收数据service类
  30. *
  31. * @author whc
  32. */
  33. @Service
  34. @Slf4j
  35. public class ReceiveDataService {
  36. @Autowired
  37. ElectricFieldService electricFieldService;
  38. @Autowired
  39. WindTowerInfoService windTowerInfoService;
  40. @Autowired
  41. WindTurbineInfoService windTurbineInfoService;
  42. @Autowired
  43. WeatherStationInfoService weatherStationInfoService;
  44. @Autowired
  45. InverterInfoService inverterInfoService;
  46. @Autowired
  47. DataPointService dataPointService;
  48. @Autowired
  49. WeatherStationStatusDataRepository weatherStationStatusDataRepository;
  50. @Autowired
  51. InverterStatusDataRepository inverterStatusDataRepository;
  52. @Autowired
  53. WindTurbineStatusDataRepository windTurbineStatusDataRepository;
  54. @Autowired
  55. WindTowerStatusDataRepository windTowerStatusDataRepository;
  56. @Autowired
  57. RedisUtils redisUtils;
  58. @Autowired
  59. EquipmentAttributeService equipmentAttributeService;
  60. @Autowired
  61. PowerStationStatusDataRepository powerStationStatusDataRepository;
  62. @Autowired
  63. PowerStationDataPackerContainer powerStationDataPackerContainer;
  64. @Value("${receive.ip}")
  65. private String ip;
  66. @Value("${receive.port}")
  67. private String port;
  68. @Value("${receive.path}")
  69. private String path;
  70. private final String Active = "activePower";
  71. //请求超时时间,这个时间定义了socket读数据的超时时间,也就是连接到服务器之后到从服务器获取响应数据需要等待的时间,发生超时,会抛出SocketTimeoutException异常。
  72. private static final int SOCKET_TIME_OUT = 5000;
  73. //连接超时时间,这个时间定义了通过网络与服务器建立连接的超时时间,也就是取得了连接池中的某个连接之后到接通目标url的连接等待时间。发生超时,会抛出ConnectionTimeoutException异常
  74. private static final int CONNECT_TIME_OUT = 3000;
  75. public void receive() {
  76. List<ElectricField> electricFieldList = electricFieldService.getAll();
  77. //每个场站请求一次
  78. for (ElectricField electricField : electricFieldList) {
  79. log.info(electricField.getName() + "开始请求数据");
  80. Long startTime = DateTimeUtil.getCurrentTimeForMinute().getTime();
  81. Long endTime = startTime + 60000L;
  82. try {
  83. //通用查询
  84. if (electricField.getElectricFieldTypeEnum().getCode() == 1) {
  85. //气象站
  86. List<WeatherStationInfo> weatherStationInfoList = weatherStationInfoService.get(electricField.getStationCode());
  87. //逆变器
  88. List<InverterInfo> inverterInfoList = inverterInfoService.getByStationCode(electricField.getStationCode());
  89. //按设备请求
  90. for (WeatherStationInfo weatherStationInfo : weatherStationInfoList) {
  91. map(electricField, startTime, endTime, weatherStationInfo.getEquipmentNo(), weatherStationInfo.getId(), weatherStationInfo.getEquipmentType());
  92. }
  93. for (InverterInfo inverterInfo : inverterInfoList) {
  94. map(electricField, startTime, endTime, inverterInfo.getEquipmentNo(), inverterInfo.getId(), inverterInfo.getEquipmentType());
  95. }
  96. } else {
  97. //测风塔
  98. List<WindTowerInfo> windTowerInfoList = windTowerInfoService.get(electricField.getStationCode());
  99. //风机
  100. List<WindTurbineInfo> windTurbineInfoList = windTurbineInfoService.getByStationCode(electricField.getStationCode());
  101. for (WindTowerInfo windTowerInfo : windTowerInfoList) {
  102. map(electricField, startTime, endTime, windTowerInfo.getEquipmentNo(), windTowerInfo.getId(), windTowerInfo.getEquipmentType());
  103. }
  104. for (WindTurbineInfo windTurbineInfo : windTurbineInfoList) {
  105. map(electricField, startTime, endTime, windTurbineInfo.getEquipmentNo(), windTurbineInfo.getId(), windTurbineInfo.getEquipmentType());
  106. }
  107. }
  108. savePowerStationStatusData(electricField);
  109. } catch (Exception e) {
  110. e.printStackTrace();
  111. }
  112. }
  113. }
  114. private void map(ElectricField electricField, Long startTime, Long endTime, String equipmentNo, Integer id, EquipmentTypeEnum equipmentType) {
  115. HashMap<String, Object> paramMap = new HashMap<>();
  116. paramMap.put("deviceIds", equipmentNo);
  117. paramMap.put("startTime", startTime);
  118. paramMap.put("endTime", endTime);
  119. log.info("param"+JSON.toJSONString(paramMap));
  120. String body = httpClient(paramMap);
  121. AnalysisData(body, id, equipmentType, electricField);
  122. }
  123. /**
  124. * 解析数据
  125. *
  126. * @param body 所有数据
  127. * @param equipmentId 设备id
  128. * @param equipmentType 设备类型
  129. * @param electricField 场站对象
  130. */
  131. public void AnalysisData(String body, Integer equipmentId, EquipmentTypeEnum equipmentType, ElectricField electricField) {
  132. try {
  133. List<DataPoint> dataPointList = dataPointService.getByEquipmentType(equipmentType);
  134. JSONObject jsonObject= JSON.parseObject(body);
  135. JSONArray jsonResults = (JSONArray) jsonObject.get("results");
  136. if (jsonResults.size()>0){
  137. JSONObject jsonObj = jsonResults.getJSONObject(0);
  138. JSONArray jsonRows = (JSONArray) jsonObj.get("rows");
  139. //所有设备点位
  140. JSONArray sensorIds = (JSONArray) jsonObj.get("sensorIds");
  141. JSONObject values;
  142. Map<String, String> map = new HashMap<>();
  143. SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  144. if (jsonRows.size()>0){
  145. values = jsonRows.getJSONObject(0);
  146. //时间戳
  147. String time = values.getString("timestamp");
  148. //所有数据
  149. JSONArray jsonValues = (JSONArray) values.get("values");
  150. long timestamp = Long.parseLong(time);
  151. map.put("stationCode",electricField.getStationCode());
  152. map.put("equipmentId", equipmentId.toString());
  153. map.put("time", simpleDateFormat.format(timestamp));
  154. for (DataPoint dataPoint : dataPointList) {
  155. for (int i = 0; i < sensorIds.size(); i++) {
  156. if (dataPoint.getMeasuringPoint().equals(sensorIds.get(i))) {
  157. if(jsonValues.get(i) == null ){
  158. map.put(dataPoint.getEquipmentAttribute().getFieldName(), "-99");
  159. }else{
  160. map.put(dataPoint.getEquipmentAttribute().getFieldName(), jsonValues.get(i).toString());
  161. }
  162. }
  163. }
  164. }
  165. String STATUS = "status";
  166. switch (map.get(STATUS)) {
  167. case "1.00":
  168. case "true":
  169. //运行
  170. map.put(STATUS, "1");
  171. break;
  172. case "2.00":
  173. //待机
  174. map.put(STATUS, "2");
  175. break;
  176. case "3.00":
  177. //停用
  178. map.put(STATUS, "3");
  179. break;
  180. case "4.00":
  181. case "false":
  182. //故障
  183. map.put(STATUS, "4");
  184. break;
  185. case "-99":
  186. map.put(STATUS, "5");
  187. break;
  188. default:
  189. break;
  190. }
  191. String ap = map.get(Active);
  192. switch(equipmentType.getCode()){
  193. case 1:
  194. //气象站
  195. redisUtils.hmset("qxz-" + electricField.getStationCode() + "-" + equipmentId, map);
  196. WeatherStationStatusData weatherStationStatusData = JSON.parseObject(JSON.toJSONString(map), WeatherStationStatusData.class);
  197. weatherStationStatusDataRepository.save(weatherStationStatusData);
  198. log.info("qxz-" + electricField.getStationCode() + "-" + equipmentId+"已存入");
  199. break;
  200. case 2:
  201. //逆变器
  202. if (ap != null && map.get("electricalCurrent") != null) {
  203. //有功
  204. BigDecimal activePower = new BigDecimal(ap);
  205. //电流
  206. BigDecimal electricalCurrent = new BigDecimal(map.get("electricalCurrent"));
  207. //有功/电流=电压
  208. BigDecimal voltage = activePower.divide(electricalCurrent, 2, BigDecimal.ROUND_HALF_UP);
  209. map.put("voltage", voltage.toString());
  210. }
  211. redisUtils.hmset("nbq-" + electricField.getStationCode() + "-" + equipmentId, map);
  212. InverterStatusData inverterStatusData = JSON.parseObject(JSON.toJSONString(map), InverterStatusData.class);
  213. inverterStatusDataRepository.save(inverterStatusData);
  214. log.info("nbq-" + electricField.getStationCode() + "-" + equipmentId+"已存入");
  215. break;
  216. case 3:
  217. redisUtils.hmset("fj-" + electricField.getStationCode() + "-" + equipmentId, map);
  218. WindTurbineStatusData windTurbineStatusData = JSON.parseObject(JSON.toJSONString(map), WindTurbineStatusData.class);
  219. windTurbineStatusDataRepository.save(windTurbineStatusData);
  220. log.info("fj-" + electricField.getStationCode() + "-" + equipmentId+"已存入");
  221. break;
  222. case 4:
  223. redisUtils.hmset("cft-" + electricField.getStationCode() + "-" + equipmentId, map);
  224. WindTowerStatusData windTowerStatusData = JSON.parseObject(JSON.toJSONString(map), WindTowerStatusData.class);
  225. windTowerStatusDataRepository.save(windTowerStatusData);
  226. log.info("cft-" + electricField.getStationCode() + "-" + equipmentId+"已存入");
  227. break;
  228. default:
  229. break;
  230. }
  231. }else {
  232. log.info(electricField.getStationCode()+ "-" +equipmentType.getMessage() + "-" + equipmentId+"无数据,不进行任何操作");
  233. }
  234. }else {
  235. log.info(electricField.getStationCode()+ "-" +equipmentType.getMessage() + "-" + equipmentId+"对方没有该设备,请核对设备编号");
  236. }
  237. } catch (Exception e) {
  238. log.info(electricField.getName()+equipmentType.getMessage()+"接数程序异常");
  239. e.printStackTrace();
  240. }
  241. }
  242. /**
  243. * 发http请求
  244. * @param paramMap 参数
  245. * @return 返回的数据
  246. */
  247. public String httpClient( HashMap<String, Object> paramMap){
  248. String body = "";
  249. try{
  250. CloseableHttpClient httpClient = HttpClientBuilder.create().build();
  251. HttpPost httpPost = new HttpPost("http://"+ip+":"+port+path);
  252. StringEntity entity = new StringEntity(JSON.toJSONString(paramMap),"UTF-8");
  253. httpPost.setEntity(entity);
  254. //设置请求超时时间,链接超时时间
  255. RequestConfig reqConfig = RequestConfig.custom().setSocketTimeout(SOCKET_TIME_OUT).setConnectTimeout(CONNECT_TIME_OUT).build();
  256. httpPost.setConfig(reqConfig);
  257. httpPost.setHeader("Content-Type", "application/json; charset=utf-8");
  258. CloseableHttpResponse response;
  259. response = httpClient.execute(httpPost);
  260. HttpEntity responseEntity = response.getEntity();
  261. body = EntityUtils.toString(responseEntity, StandardCharsets.UTF_8);
  262. }catch (RuntimeException | IOException e){
  263. e.printStackTrace();
  264. log.info("请求异常");
  265. log.info("所用线程"+Thread.currentThread().getName());
  266. log.info("停止请求数据");
  267. Thread.currentThread().stop();
  268. }
  269. return body;
  270. }
  271. public void savePowerStationStatusData(ElectricField electricField) {
  272. try {
  273. log.info("开始计算实际功率");
  274. BigDecimal pssd = new BigDecimal("0");
  275. if (electricField.getElectricFieldTypeEnum().getCode() == 1) {
  276. List<InverterInfo> inverterInfoList = inverterInfoService.getByStationCode(electricField.getStationCode());
  277. for (InverterInfo inverterInfo : inverterInfoList) {
  278. Map<String, String> getMap = redisUtils.hgetall("nbq-" + electricField.getStationCode() + "-" + inverterInfo.getId());
  279. String activePower = getMap.get(Active);
  280. if (activePower != null) {
  281. pssd = pssd.add(new BigDecimal(activePower));
  282. }
  283. }
  284. } else {
  285. List<WindTurbineInfo> windTurbineInfoList = windTurbineInfoService.getByStationCode(electricField.getStationCode());
  286. for (WindTurbineInfo windTurbineInfo : windTurbineInfoList) {
  287. Map<String, String> getMap = redisUtils.hgetall("fj-" + electricField.getStationCode() + "-" + windTurbineInfo.getId());
  288. String activePower = getMap.get(Active);
  289. if (activePower != null) {
  290. pssd = pssd.add(new BigDecimal(activePower));
  291. }
  292. }
  293. }
  294. if (pssd.compareTo(new BigDecimal("0")) > 0) {
  295. pssd = pssd.divide(new BigDecimal("1000"), 2, BigDecimal.ROUND_HALF_UP);
  296. }
  297. PowerStationStatusData p = powerStationDataPackerContainer.getDataPacker(electricField.getStationCode()).packageData(pssd);
  298. powerStationStatusDataRepository.save(p);
  299. //对象转map
  300. Map<String, String> map = JSON.parseObject(JSON.toJSONString(p), new TypeReference<Map<String, String>>() {
  301. });
  302. redisUtils.hmset("power-" + electricField.getStationCode(), map);
  303. log.info("实际功率结束");
  304. } catch (Exception e) {
  305. e.printStackTrace();
  306. }
  307. }
  308. public void stop(){
  309. Thread thread = new Thread(() -> {
  310. while (Thread.currentThread().isInterrupted()) {
  311. log.info("停止请求数据");
  312. }
  313. });
  314. thread.start();
  315. try {
  316. Thread.sleep(1);
  317. } catch (Exception e) {
  318. e.printStackTrace();
  319. }
  320. thread.interrupt();
  321. }
  322. }