|
@@ -1,24 +1,36 @@
|
|
|
package com.cpp.web.service.datafactory.impl;
|
|
|
|
|
|
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
|
|
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
|
|
+import com.cpp.common.utils.spring.SpringUtils;
|
|
|
import com.cpp.system.service.ISysConfigService;
|
|
|
import com.cpp.web.domain.datafactory.BaseParsing;
|
|
|
import com.cpp.web.domain.datafactory.ParsingDq;
|
|
|
import com.cpp.web.domain.datafactory.dto.ParsingConfParam;
|
|
|
import com.cpp.web.domain.datafactory.dto.ParsingResultDto;
|
|
|
import com.cpp.web.domain.datafactory.enums.FileTypeEnum;
|
|
|
+import com.cpp.web.domain.enums.UploadStatusEnum;
|
|
|
+import com.cpp.web.domain.regulation.TempShortRegulation;
|
|
|
import com.cpp.web.domain.station.ForecastPowerShortTermRegulation;
|
|
|
+import com.cpp.web.domain.station.ForecastPowerShortTermSend;
|
|
|
import com.cpp.web.domain.station.ForecastPowerShortTermStation;
|
|
|
import com.cpp.web.service.datafactory.DataStore;
|
|
|
import com.cpp.web.service.datafactory.ParsingDqService;
|
|
|
import com.cpp.web.service.datafactory.ParsingInterface;
|
|
|
import com.cpp.web.service.datafactory.ScheduledHelper;
|
|
|
+import com.cpp.web.service.regulation.TempShortRegulationService;
|
|
|
import com.cpp.web.service.station.ForecastPowerShortTermRegulationService;
|
|
|
+import com.cpp.web.service.station.ForecastPowerShortTermSendService;
|
|
|
import com.cpp.web.utils.DateTimeUtil;
|
|
|
import com.cpp.web.utils.ParsingFieldUtil;
|
|
|
import com.cpp.web.utils.ParsingFileUtil;
|
|
|
import com.cpp.web.utils.ParsingUtil;
|
|
|
import lombok.AllArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
+import org.apache.commons.lang3.time.DateFormatUtils;
|
|
|
+import org.apache.commons.lang3.time.DateUtils;
|
|
|
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import org.springframework.transaction.annotation.Transactional;
|
|
|
|
|
@@ -28,6 +40,7 @@ import java.text.SimpleDateFormat;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Date;
|
|
|
import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
@@ -49,6 +62,10 @@ public class ParsingRdqServiceImpl implements ParsingInterface {
|
|
|
|
|
|
private List<ParsingDq> parsingDqs = new ArrayList<>();
|
|
|
|
|
|
+ private final TempShortRegulationService tempShortRegulationService;
|
|
|
+
|
|
|
+ private final ForecastPowerShortTermSendService forecastPowerShortTermSendService;
|
|
|
+
|
|
|
@Override
|
|
|
public boolean save(BaseParsing parsing) {
|
|
|
return true;
|
|
@@ -79,6 +96,7 @@ public class ParsingRdqServiceImpl implements ParsingInterface {
|
|
|
List<String> fileContent = ParsingFileUtil.getFileContent(file);
|
|
|
if (parsingDqInfo.getDataType().equals(ParsingFieldUtil.MULTI)) {//多行操作
|
|
|
Date forecastTime = parsingForecastTime(parsingDqInfo.getForecastTime(), fileContent);
|
|
|
+ String status = parsingUploadStatus("2<=>5<=>status=", fileContent);
|
|
|
Date time = forecastTime;
|
|
|
if (forecastTime != null) {//解析时间成功才可以进行以下操作
|
|
|
if (parsingDqInfo.getFpValue() != null) {
|
|
@@ -103,10 +121,19 @@ public class ParsingRdqServiceImpl implements ParsingInterface {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- scheduleAddCache(forecastPowerShortTermRegulationList,stationCode);
|
|
|
+ scheduleAddCache(forecastPowerShortTermRegulationList, stationCode);
|
|
|
forecastPowerShortTermRegulationService.saveBatch(forecastPowerShortTermRegulationList);
|
|
|
log.info("解析 调控后DQ文件:{} 成功! O(∩_∩)O", file.getName());
|
|
|
parsingResultDto.setStatus("success");
|
|
|
+
|
|
|
+ //使用新线程写入状态,不会影响解析速度
|
|
|
+ SpringUtils.getBean(ThreadPoolTaskExecutor.class).execute(new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ verifyRegulation(stationCode, forecastPowerShortTermRegulationList, forecastTime, status);
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
} else {
|
|
|
parsingResultDto.setMessage("解析 调控后短期文件时间错误");
|
|
|
log.error("解析 调控后DQ文件时间错误");
|
|
@@ -135,6 +162,61 @@ public class ParsingRdqServiceImpl implements ParsingInterface {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 校验调控后数据是否成功上传
|
|
|
+ *
|
|
|
+ * @param stationCode
|
|
|
+ * @param forecastPowerShortTermRegulationList
|
|
|
+ * @param forecastTime
|
|
|
+ * @param status
|
|
|
+ */
|
|
|
+ private void verifyRegulation(String stationCode, List<ForecastPowerShortTermRegulation> forecastPowerShortTermRegulationList, Date forecastTime, String status) {
|
|
|
+ String dqEndTime = configService.selectConfigByKey("dqEndTime");
|
|
|
+ if (dqEndTime.equals(StringUtils.EMPTY)) {
|
|
|
+ dqEndTime = "08:00";
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ Long longTime = forecastTime.getTime() - 86400000L;
|
|
|
+ Date startTime = DateTimeUtil.getDayStartTime(longTime);
|
|
|
+ Date endTime = DateUtils.parseDate(DateFormatUtils.format(startTime, "yyyy-MM-dd " + dqEndTime), "yyyy-MM-dd HH:mm");
|
|
|
+ QueryWrapper<TempShortRegulation> tk_date = new QueryWrapper<TempShortRegulation>().between("tk_date", startTime, endTime).eq("station_code", stationCode);
|
|
|
+ List<TempShortRegulation> tempShortRegulations = tempShortRegulationService.list(tk_date);
|
|
|
+ if (tempShortRegulations.size() > 0) {
|
|
|
+ if (status.equals("success")) {
|
|
|
+ List<ForecastPowerShortTermSend> forecastPowerShortTermSendList = forecastPowerShortTermSendService.findByForecastTimeBetweenAndForecastHowLongAgoAndStationCode(forecastPowerShortTermRegulationList.get(0).getTime().getTime(), forecastPowerShortTermRegulationList.get(forecastPowerShortTermRegulationList.size() - 1).getTime().getTime(), -99, stationCode);
|
|
|
+ Map<Date, List<ForecastPowerShortTermSend>> dateListMap = forecastPowerShortTermSendList.stream().collect(Collectors.groupingBy(ForecastPowerShortTermSend::getTime, Collectors.toList()));
|
|
|
+ Map<Date, List<ForecastPowerShortTermRegulation>> listMap = forecastPowerShortTermRegulationList.stream().collect(Collectors.groupingBy(ForecastPowerShortTermRegulation::getTime, Collectors.toList()));
|
|
|
+ String uploadStatus = UploadStatusEnum.E1.name();
|
|
|
+ for (Map.Entry<Date, List<ForecastPowerShortTermRegulation>> dateListEntry : listMap.entrySet()) {
|
|
|
+ if (dateListMap.containsKey(dateListEntry.getKey())) {
|
|
|
+ if (dateListMap.get(dateListEntry.getKey()).get(0).getFpValue().compareTo(dateListEntry.getValue().get(0).getFpValue()) != 0) {
|
|
|
+ uploadStatus = UploadStatusEnum.E2.name();
|
|
|
+ log.info("回传数据与下发数据不符,调控失败{}", stationCode);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ uploadStatus = UploadStatusEnum.E2.name();
|
|
|
+ log.info("回传数据时间与下发数据时间不符,调控失败{}", stationCode);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ for (TempShortRegulation tempShortRegulation : tempShortRegulations) {
|
|
|
+ tempShortRegulation.setUploadStatusEnum(uploadStatus);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ log.info("站端返回上报结果失败,调控失败{}", stationCode);
|
|
|
+ for (TempShortRegulation tempShortRegulation : tempShortRegulations) {
|
|
|
+ tempShortRegulation.setUploadStatusEnum(UploadStatusEnum.E2.name());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ tempShortRegulationService.saveOrUpdateBatch(tempShortRegulations);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("验证调控失败", e);
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
protected Date parsingForecastTime(String sign, List<String> fileContent) {
|
|
|
Date forecastTime = null;
|
|
|
try {
|
|
@@ -237,13 +319,14 @@ public class ParsingRdqServiceImpl implements ParsingInterface {
|
|
|
* @param time
|
|
|
*/
|
|
|
public Integer calcHowLongAgo(Date baseTime, Date time) {
|
|
|
- return ((int) (time.getTime() - baseTime.getTime() % 86400000L)) + 1;
|
|
|
+ return ((int) ((time.getTime() - baseTime.getTime()) / 86400000L)) + 1;
|
|
|
}
|
|
|
|
|
|
|
|
|
private final ScheduledHelper scheduledHelper = new ScheduledHelper();
|
|
|
|
|
|
- public void scheduleAddCache(List<ForecastPowerShortTermRegulation> forecastPowerShortTermRegulationList, String stationCode) {
|
|
|
+ public void scheduleAddCache
|
|
|
+ (List<ForecastPowerShortTermRegulation> forecastPowerShortTermRegulationList, String stationCode) {
|
|
|
|
|
|
int ago = 1;
|
|
|
try {
|