瀏覽代碼

添加接数请求类型 切片查询 修改配置文件,由配置文件的mode控制接数请求方式 slice为切片查询 getTaosData为通用接口查询

wanghc 3 年之前
父節點
當前提交
5980e632ab

+ 14 - 6
ipfcst-console/src/main/java/com/jiayue/ipfcst/console/job/ReceiveDataJob.java

@@ -1,15 +1,17 @@
 package com.jiayue.ipfcst.console.job;
 
+import com.jiayue.ipfcst.console.service.ReceiveAllDataService;
 import com.jiayue.ipfcst.console.service.ReceiveDataService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.scheduling.annotation.EnableScheduling;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Service;
 
 
 /**
- * 天气预报下载 停用  现已由外网下发不主动下载
+ * 接收数据定时任务类
  *
  * @author whc
  * @version 3.0
@@ -21,15 +23,21 @@ public class ReceiveDataJob {
 
   @Autowired
   private ReceiveDataService receiveDataService;
-
-
+  @Autowired
+  private ReceiveAllDataService receiveAllDataService;
+  @Value("${receive.mode}")
+  private String mode;
   @Scheduled(fixedRate = 60000)
   public void receiveData() {
 
     log.info("开始向绿能数据中心请求数据");
-
-    this.receiveDataService.receive();
-
+    if(mode.equals("slice")){
+      //切片查询
+      this.receiveAllDataService.receive();
+    }else{
+      //通用接口查询
+      this.receiveDataService.receive();
+    }
     log.info("数据请求结束");
 
   }

+ 409 - 0
ipfcst-console/src/main/java/com/jiayue/ipfcst/console/service/ReceiveAllDataService.java

@@ -0,0 +1,409 @@
+package com.jiayue.ipfcst.console.service;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.TypeReference;
+import com.jiayue.ipfcst.common.core.util.DateTimeUtil;
+import com.jiayue.ipfcst.common.data.constant.enums.AlarmTypeEnum;
+import com.jiayue.ipfcst.common.data.constant.enums.EquipmentTypeEnum;
+import com.jiayue.ipfcst.common.data.entity.*;
+import com.jiayue.ipfcst.common.data.repository.*;
+import com.jiayue.ipfcst.common.data.service.BaseService;
+import com.jiayue.ipfcst.console.util.RedisUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 接收数据service类 切片查询 按设备类型查询
+ *
+ * @author whc
+ */
+@Service
+@Slf4j
+public class ReceiveAllDataService extends BaseService {
+  @Autowired
+  ElectricFieldService electricFieldService;
+  @Autowired
+  WindTowerInfoService windTowerInfoService;
+  @Autowired
+  WindTurbineInfoService windTurbineInfoService;
+  @Autowired
+  WeatherStationInfoService weatherStationInfoService;
+  @Autowired
+  InverterInfoService inverterInfoService;
+  @Autowired
+  DataPointService dataPointService;
+  @Autowired
+  WeatherStationStatusDataRepository weatherStationStatusDataRepository;
+  @Autowired
+  InverterStatusDataRepository inverterStatusDataRepository;
+  @Autowired
+  WindTurbineStatusDataRepository windTurbineStatusDataRepository;
+  @Autowired
+  WindTowerStatusDataRepository windTowerStatusDataRepository;
+  @Autowired
+  RedisUtils redisUtils;
+  @Autowired
+  EquipmentAttributeService equipmentAttributeService;
+  @Autowired
+  PowerStationStatusDataRepository powerStationStatusDataRepository;
+  @Autowired
+  PowerStationDataPackerContainer powerStationDataPackerContainer;
+  @Autowired
+  SysAlarmService sysAlarmService;
+  @Value("${receive.ip}")
+  private String ip;
+  @Value("${receive.port}")
+  private  String port;
+  @Value("${receive.path}")
+  private  String path;
+  @Value("${receive.mode}")
+  private String mode;
+  private  final String Active = "activePower";
+
+  //请求超时时间,这个时间定义了socket读数据的超时时间,也就是连接到服务器之后到从服务器获取响应数据需要等待的时间,发生超时,会抛出SocketTimeoutException异常。
+  private static final int SOCKET_TIME_OUT = 5000;
+  //连接超时时间,这个时间定义了通过网络与服务器建立连接的超时时间,也就是取得了连接池中的某个连接之后到接通目标url的连接等待时间。发生超时,会抛出ConnectionTimeoutException异常
+  private static final int CONNECT_TIME_OUT = 3000;
+
+
+
+  public void receive() {
+    List<ElectricField> electricFieldList = electricFieldService.getAll();
+    long endTime =  DateTimeUtil.getCurrentTimeForMinute().getTime();
+    //每个场站请求一次
+    for (ElectricField electricField : electricFieldList) {
+      log.info(electricField.getName() + "开始请求数据");
+      try {
+
+        //切片查询
+        if (electricField.getElectricFieldTypeEnum().getCode() == 1) {
+          //按设备类型请求
+          List<WeatherStationInfo> weatherStationInfoList = weatherStationInfoService.get(electricField.getStationCode());
+          List<InverterInfo> inverterInfoList = inverterInfoService.getByStationCode(electricField.getStationCode());
+          //气象站
+          if(!weatherStationInfoList.isEmpty()){
+            //设备编号
+            String equipmentNo = "";
+            //测点id
+            String sensorIds = "";
+            List<DataPoint> dataPointList;
+            for (WeatherStationInfo weatherStationInfo : weatherStationInfoList) {
+              equipmentNo += weatherStationInfo.getEquipmentNo()+",";
+            }
+            dataPointList = dataPointService.getByEquipmentType(weatherStationInfoList.get(0).getEquipmentType());
+            for (DataPoint dataPoint : dataPointList){
+              sensorIds += dataPoint.getMeasuringPoint()+",";
+            }
+            equipmentNo = equipmentNo.substring(0,equipmentNo.length()-1);
+            sensorIds = sensorIds.substring(0,sensorIds.length()-1);
+            map(electricField,endTime,equipmentNo,weatherStationInfoList.get(0).getEquipmentType(),sensorIds);
+          }
+
+          //逆变器
+          if (!inverterInfoList.isEmpty()){
+            //设备编号
+            String equipmentNo = "";
+            //测点id
+            String sensorIds = "";
+            List<DataPoint> dataPointList;
+            for (InverterInfo inverterInfo : inverterInfoList) {
+              equipmentNo += inverterInfo.getEquipmentNo()+",";
+             }
+            dataPointList = dataPointService.getByEquipmentType(inverterInfoList.get(0).getEquipmentType());
+            for (DataPoint dataPoint : dataPointList){
+              sensorIds += ","+dataPoint.getMeasuringPoint()+",";
+            }
+            equipmentNo = equipmentNo.substring(0,equipmentNo.length()-1);
+            sensorIds = sensorIds.substring(0,sensorIds.length()-1);
+            map(electricField, endTime,equipmentNo,inverterInfoList.get(0).getEquipmentType(),sensorIds);
+
+          }
+
+        } else {
+          List<WindTowerInfo> windTowerInfoList = windTowerInfoService.get(electricField.getStationCode());
+
+          List<WindTurbineInfo> windTurbineInfoList = windTurbineInfoService.getByStationCode(electricField.getStationCode());
+          //测风塔
+          if(!windTowerInfoList.isEmpty()){
+            //设备编号
+            String equipmentNo = "";
+            //测点id
+            String sensorIds = "";
+            List<DataPoint> dataPointList;
+            for (WindTowerInfo windTowerInfo : windTowerInfoList) {
+              equipmentNo += windTowerInfo.getEquipmentNo()+",";
+            }
+            dataPointList = dataPointService.getByEquipmentType(windTowerInfoList.get(0).getEquipmentType());
+            for (DataPoint dataPoint : dataPointList){
+              sensorIds += dataPoint.getMeasuringPoint()+",";
+            }
+            equipmentNo = equipmentNo.substring(0,equipmentNo.length()-1);
+            sensorIds = sensorIds.substring(0,sensorIds.length()-1);
+            map(electricField,endTime,equipmentNo,windTowerInfoList.get(0).getEquipmentType(),sensorIds);
+          }
+          //风机
+          if(!windTurbineInfoList.isEmpty()){
+            //设备编号
+            String equipmentNo = "";
+            //测点id
+            String sensorIds = "";
+            List<DataPoint> dataPointList;
+            for (WindTurbineInfo windTurbineInfo : windTurbineInfoList) {
+              equipmentNo += windTurbineInfo.getEquipmentNo()+",";
+            }
+            dataPointList = dataPointService.getByEquipmentType(windTurbineInfoList.get(0).getEquipmentType());
+            for (DataPoint dataPoint : dataPointList){
+              sensorIds += dataPoint.getMeasuringPoint()+",";
+            }
+            equipmentNo = equipmentNo.substring(0,equipmentNo.length()-1);
+            sensorIds = sensorIds.substring(0,sensorIds.length()-1);
+            map(electricField,endTime,equipmentNo,windTurbineInfoList.get(0).getEquipmentType(),sensorIds);
+          }
+        }
+        savePowerStationStatusData(electricField);
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  private void map(ElectricField electricField, Long endTime, String equipmentNo,EquipmentTypeEnum equipmentType,String sensorIds) {
+    HashMap<String, Object> paramMap = new HashMap<>();
+    paramMap.put("deviceIds", equipmentNo);
+    paramMap.put("timestamp", endTime);
+    paramMap.put("sensorIds",sensorIds);
+    String body = httpClient(paramMap,electricField);
+    log.debug(body);
+    AnalysisData(body, equipmentType, electricField);
+  }
+
+  /**
+   * 解析数据
+   *
+   * @param body          所有数据
+   * @param equipmentType 设备类型
+   * @param electricField 场站对象
+   */
+  public void AnalysisData(String body, EquipmentTypeEnum equipmentType, ElectricField electricField) {
+    try {
+      List<DataPoint> dataPointList = dataPointService.getByEquipmentType(equipmentType);
+      JSONObject jsonObject= JSON.parseObject(body);
+      JSONArray jsonResults = (JSONArray) jsonObject.get("results");
+      if (!jsonResults.isEmpty()){
+        for(int i = 0 ; i < jsonResults.size() ; i ++){
+          //results
+          JSONObject jsonObj = jsonResults.getJSONObject(i);
+          //设备id
+          String equipmentId  = (String) jsonObj.get("deviceId");
+          //rows
+          JSONArray jsonRows = (JSONArray) jsonObj.get("rows");
+          //所有设备点位
+          JSONArray sensorIds = (JSONArray) jsonObj.get("sensorIds");
+          JSONObject values;
+          Map<String, String> map = new HashMap<>();
+          SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+          if (!jsonRows.isEmpty()){
+            values = jsonRows.getJSONObject(0);
+            //时间戳
+            String time = values.getString("timestamp");
+            //所有数据
+            JSONArray jsonValues = (JSONArray) values.get("values");
+            long timestamp = Long.parseLong(time);
+            map.put("stationCode",electricField.getStationCode());
+            map.put("equipmentId", equipmentId);
+            map.put("time", simpleDateFormat.format(timestamp));
+            for (DataPoint dataPoint : dataPointList) {
+              for (int j = 0; j < sensorIds.size(); j++) {
+                if (dataPoint.getMeasuringPoint().equals(sensorIds.get(j))) {
+                  if(jsonValues.get(j) == null ){
+                    map.put(dataPoint.getEquipmentAttribute().getFieldName(), "-99");
+                  }else{
+                    map.put(dataPoint.getEquipmentAttribute().getFieldName(), jsonValues.get(j).toString());
+                  }
+                }
+              }
+            }
+            String STATUS = "status";
+            switch (map.get(STATUS)) {
+              case "1.00":
+              case "true":
+                //运行
+                map.put(STATUS, "1");
+                break;
+              case "2.00":
+                //待机
+                map.put(STATUS, "2");
+                break;
+              case "3.00":
+                //停用
+                map.put(STATUS, "3");
+                break;
+              case "4.00":
+              case "false":
+                //通讯中断
+                map.put(STATUS, "4");
+                break;
+              case "5.00":
+              case "-99":
+                map.put(STATUS, "5");
+                break;
+              default:
+                break;
+            }
+
+            String ap = map.get(Active);
+            switch(equipmentType.getCode()){
+              case 1:
+                //气象站
+                redisUtils.hmset("qxz-" + electricField.getStationCode() + "-" + equipmentId, map);
+                WeatherStationStatusData weatherStationStatusData = JSON.parseObject(JSON.toJSONString(map), WeatherStationStatusData.class);
+                weatherStationStatusDataRepository.save(weatherStationStatusData);
+                log.info(map.get("time")+"qxz-" + electricField.getStationCode() + "-" + equipmentId+"已存入");
+                break;
+              case 2:
+                //逆变器
+                if (ap != null && map.get("electricalCurrent") != null) {
+                  //有功
+                  BigDecimal activePower = new BigDecimal(ap);
+                  //电流
+                  BigDecimal electricalCurrent = new BigDecimal(map.get("electricalCurrent"));
+                  //有功/电流=电压
+                  BigDecimal voltage = activePower.divide(electricalCurrent, 2, BigDecimal.ROUND_HALF_UP);
+                  map.put("voltage", voltage.toString());
+                }
+                redisUtils.hmset("nbq-" + electricField.getStationCode() + "-" + equipmentId, map);
+                InverterStatusData inverterStatusData = JSON.parseObject(JSON.toJSONString(map), InverterStatusData.class);
+                inverterStatusDataRepository.save(inverterStatusData);
+                log.info(map.get("time")+"nbq-" + electricField.getStationCode() + "-" + equipmentId+"已存入");
+                break;
+              case 3:
+                redisUtils.hmset("fj-" + electricField.getStationCode() + "-" + equipmentId, map);
+                WindTurbineStatusData windTurbineStatusData = JSON.parseObject(JSON.toJSONString(map), WindTurbineStatusData.class);
+                windTurbineStatusDataRepository.save(windTurbineStatusData);
+                log.info("fj-" + electricField.getStationCode() + "-" + equipmentId+"已存入");
+                break;
+              case 4:
+                redisUtils.hmset("cft-" + electricField.getStationCode() + "-" + equipmentId, map);
+                WindTowerStatusData windTowerStatusData = JSON.parseObject(JSON.toJSONString(map), WindTowerStatusData.class);
+                windTowerStatusDataRepository.save(windTowerStatusData);
+                log.info("cft-" + electricField.getStationCode() + "-" + equipmentId+"已存入");
+                break;
+              default:
+                break;
+            }
+          }else {
+            log.info(electricField.getStationCode()+ "-" +equipmentType.getMessage() + "-" + equipmentId+"无数据,不进行任何操作");
+          }
+        }
+
+      }else {
+        log.info(electricField.getStationCode()+ "-" +equipmentType.getMessage() + "-" +"对方没有该类型设备,请核对设备信息");
+      }
+    } catch (Exception e) {
+      log.info(electricField.getName()+equipmentType.getMessage()+"程序解析数据异常");
+      // 进行告警
+      String errorInfo = electricField.getStationCode()+equipmentType.getMessage()+"程序解析数据异常";
+      String name = "程序解析数据异常";
+      String describe = "";
+      String solution = "";
+      sysAlarmService.saveSysAlarm(AlarmTypeEnum.E5, name, describe, errorInfo, solution,electricField.getStationCode());
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * 发http请求
+   * @param paramMap 参数
+   * @return 返回的数据
+   */
+  public String httpClient( HashMap<String, Object> paramMap,ElectricField electricField){
+    String body = "";
+    try{
+      CloseableHttpClient  httpClient = HttpClientBuilder.create().build();
+      HttpPost httpPost = new HttpPost("http://"+ip+":"+port+path+mode);
+
+      StringEntity entity = new StringEntity(JSON.toJSONString(paramMap),"UTF-8");
+      httpPost.setEntity(entity);
+      //设置请求超时时间,链接超时时间
+      RequestConfig reqConfig = RequestConfig.custom().setSocketTimeout(SOCKET_TIME_OUT).setConnectTimeout(CONNECT_TIME_OUT).build();
+      httpPost.setConfig(reqConfig);
+      httpPost.setHeader("Content-Type", "application/json; charset=utf-8");
+      CloseableHttpResponse response;
+      response = httpClient.execute(httpPost);
+      HttpEntity responseEntity = response.getEntity();
+      body = EntityUtils.toString(responseEntity, StandardCharsets.UTF_8);
+    }catch (RuntimeException | IOException e){
+      String errorInfo = "发送http请求数据异常";
+      log.info(errorInfo);
+      log.info("所用线程"+Thread.currentThread().getName());
+      log.info("停止请求数据");
+      // 进行告警
+      String describe = "";
+      String solution = "停止请求数据";
+      sysAlarmService.saveSysAlarm(AlarmTypeEnum.E5, errorInfo, describe, errorInfo, solution,electricField.getStationCode());
+
+      Thread.currentThread().stop();
+
+
+    }
+    return body;
+  }
+
+  public void savePowerStationStatusData(ElectricField electricField) {
+    try {
+      log.info("开始计算实际功率");
+      BigDecimal pssd = new BigDecimal("0");
+      if (electricField.getElectricFieldTypeEnum().getCode() == 1) {
+        List<InverterInfo> inverterInfoList = inverterInfoService.getByStationCode(electricField.getStationCode());
+        for (InverterInfo inverterInfo : inverterInfoList) {
+          Map<String, String> getMap = redisUtils.hgetall("nbq-" + electricField.getStationCode() + "-" + inverterInfo.getId());
+          String activePower = getMap.get(Active);
+          if (activePower != null) {
+            pssd = pssd.add(new BigDecimal(activePower));
+          }
+        }
+      } else {
+        List<WindTurbineInfo> windTurbineInfoList = windTurbineInfoService.getByStationCode(electricField.getStationCode());
+        for (WindTurbineInfo windTurbineInfo : windTurbineInfoList) {
+          Map<String, String> getMap = redisUtils.hgetall("fj-" + electricField.getStationCode() + "-" + windTurbineInfo.getId());
+          String activePower = getMap.get(Active);
+          if (activePower != null) {
+            pssd = pssd.add(new BigDecimal(activePower));
+          }
+        }
+      }
+      if (pssd.compareTo(new BigDecimal("0")) > 0) {
+        pssd = pssd.divide(new BigDecimal("1000"), 2, BigDecimal.ROUND_HALF_UP);
+      }
+      PowerStationStatusData p = powerStationDataPackerContainer.getDataPacker(electricField.getStationCode()).packageData(pssd);
+      powerStationStatusDataRepository.save(p);
+      //对象转map
+      Map<String, String> map    = JSON.parseObject(JSON.toJSONString(p), new TypeReference<Map<String, String>>() {
+      });
+      redisUtils.hmset("power-" + electricField.getStationCode(), map);
+      log.info("实际功率结束");
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+}

+ 9 - 8
ipfcst-console/src/main/java/com/jiayue/ipfcst/console/service/ReceiveDataService.java

@@ -5,14 +5,12 @@ import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.TypeReference;
 import com.jiayue.ipfcst.common.core.util.DateTimeUtil;
-import com.jiayue.ipfcst.common.data.constant.enums.AlarmStatusEnum;
 import com.jiayue.ipfcst.common.data.constant.enums.AlarmTypeEnum;
 import com.jiayue.ipfcst.common.data.constant.enums.EquipmentTypeEnum;
 import com.jiayue.ipfcst.common.data.entity.*;
 import com.jiayue.ipfcst.common.data.repository.*;
 import com.jiayue.ipfcst.common.data.service.BaseService;
 import com.jiayue.ipfcst.console.util.RedisUtils;
-import com.sun.istack.internal.NotNull;
 import org.apache.http.HttpEntity;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.client.methods.CloseableHttpResponse;
@@ -34,7 +32,7 @@ import java.text.SimpleDateFormat;
 import java.util.*;
 
 /**
- * 接收数据service类
+ * 接收数据service类 通用接口 单设备查询
  *
  * @author whc
  */
@@ -74,10 +72,12 @@ public class ReceiveDataService extends BaseService {
   @Value("${receive.ip}")
   private String ip;
   @Value("${receive.port}")
-  private  String port;
+  private String port;
   @Value("${receive.path}")
-  private  String path;
-  private  final String Active = "activePower";
+  private String path;
+  @Value("${receive.mode}")
+  private String mode;
+  private final String Active = "activePower";
 
   //请求超时时间,这个时间定义了socket读数据的超时时间,也就是连接到服务器之后到从服务器获取响应数据需要等待的时间,发生超时,会抛出SocketTimeoutException异常。
   private static final int SOCKET_TIME_OUT = 5000;
@@ -196,9 +196,10 @@ public class ReceiveDataService extends BaseService {
               break;
             case "4.00":
             case "false":
-              //故障
+              //通讯中断
               map.put(STATUS, "4");
               break;
+            case "5.00":
             case "-99":
               map.put(STATUS, "5");
               break;
@@ -273,7 +274,7 @@ public class ReceiveDataService extends BaseService {
     String body = "";
     try{
       CloseableHttpClient  httpClient = HttpClientBuilder.create().build();
-      HttpPost httpPost = new HttpPost("http://"+ip+":"+port+path);
+      HttpPost httpPost = new HttpPost("http://"+ip+":"+port+path+mode);
 
       StringEntity entity = new StringEntity(JSON.toJSONString(paramMap),"UTF-8");
       httpPost.setEntity(entity);

+ 2 - 2
ipfcst-console/src/main/resources/application.yml

@@ -122,5 +122,5 @@ fileupload:
 receive:
   ip: 10.220.57.13
   port: 7085
-  path: /dataService/v1/getTaosData
-
+  path: /dataService/v1/
+  mode: slice