|
@@ -8,24 +8,60 @@ import os.path
|
|
|
import types
|
|
|
import pandas as pd
|
|
|
from pathlib import Path
|
|
|
+from datetime import datetime, timedelta
|
|
|
from app.common.config import logger, parser
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
from functools import partial
|
|
|
|
|
|
|
|
|
class MaterialLoader:
|
|
|
- def __init__(self, target_dir, lazy_load=True):
|
|
|
+ def __init__(self, opt, lazy_load=True):
|
|
|
self.lazy_load = lazy_load
|
|
|
self._data_cache = {}
|
|
|
- self.opt = parser.parse_args_and_yaml()
|
|
|
- self.target_dir = target_dir
|
|
|
+ self.opt = opt
|
|
|
+ self.target_dir = os.path.join(opt.dqyc_base_path, opt.input_file)
|
|
|
+ self.target_dir_cdq = os.path.join(opt.cdqyc_base_path, opt.moment)
|
|
|
|
|
|
def wrapper_path(self, station_id, spec):
|
|
|
- return f"{Path(self.target_dir)/station_id/spec}.txt"
|
|
|
+ return f"{self.target_dir/station_id/spec}.txt"
|
|
|
|
|
|
def wrapper_path_cdq(self, area_id, spec):
|
|
|
return f"{self.target_dir/area_id/spec}.txt"
|
|
|
|
|
|
+ def _load_cdq_history_rp(self):
|
|
|
+ """
|
|
|
+ 加载超短期历史实发功率
|
|
|
+ """
|
|
|
+ dt = datetime.strptime(self.opt.moment, '%Y%m%d_%H%M')
|
|
|
+ dts = [x.strftime("%Y%m%d_%H%M") for x in [dt, dt-timedelta(minutes=15), dt-timedelta(minutes=30)]]
|
|
|
+ def wrapper_path_cdq_his(points):
|
|
|
+ segs = self.opt.cdqyc_base_path.split('/')
|
|
|
+ cdq_his = []
|
|
|
+ for point in points:
|
|
|
+ his_file = f'{self.opt.doc_cdq_mapping['history_power']}_{segs[0]}_{segs[1]}_{point}.txt'
|
|
|
+ cdq_his.append(os.path.join(self.opt.cdqyc_base_path, his_file))
|
|
|
+ return cdq_his
|
|
|
+ cdq_his_rp = []
|
|
|
+ for dt_text, his_file in zip(dts, wrapper_path_cdq_his(dts)):
|
|
|
+ cdq_his_df = pd.read_csv(his_file)
|
|
|
+ cdq_his_df['moment'] = dt_text
|
|
|
+ cdq_his_rp.append(cdq_his_df)
|
|
|
+ cdq_his_rp = pd.concat(cdq_his_rp, axis=0)
|
|
|
+ return cdq_his_rp
|
|
|
+
|
|
|
+ def _load_dq_res(self, area_id):
|
|
|
+ """
|
|
|
+ 加载短期预测结果
|
|
|
+ """
|
|
|
+ dt_str = datetime.strptime(self.opt.moment, '%Y%m%d_%H%M').strftime("%Y-%m-%d")
|
|
|
+ dq_path = os.path.join(self.opt.dqyc_base_path, area_id, dt_str, 'IN')
|
|
|
+ all_stations = [str(child.parts[-1]) for child in Path(str(dq_path)).iterdir() if child.is_dir()]
|
|
|
+ dqs = [pd.read_csv(os.path.join(str(dq_path), station, self.opt.doc_mapping['out'])) for station in all_stations]
|
|
|
+ dqs.append(pd.read_csv(os.path.join(str(dq_path), self.opt.doc_mapping['area_out'])))
|
|
|
+ dq = pd.concat(dqs)
|
|
|
+ return dq
|
|
|
+
|
|
|
+
|
|
|
def _load_material(self, station_id):
|
|
|
"""核心数据加载方法"""
|
|
|
# 根据您的原始代码逻辑简化的加载流程
|
|
@@ -79,8 +115,8 @@ class MaterialLoader:
|
|
|
print(f"Error loading {station_id}: {str(e)}")
|
|
|
return None
|
|
|
|
|
|
- def _load_material_cdq(self, area_id, moment):
|
|
|
- """核心数据加载方法"""
|
|
|
+ def _load_material_cdq(self, area_id):
|
|
|
+ """超短期核心数据加载方法"""
|
|
|
# 根据您的原始代码逻辑简化的加载流程
|
|
|
try:
|
|
|
basic = pd.read_csv(self.wrapper_path_cdq(area_id, self.opt.doc_cdq_mapping['basic']), sep=r'\s+', header=0)
|
|
@@ -88,45 +124,33 @@ class MaterialLoader:
|
|
|
plant_type = int(basic.loc[basic['PropertyID'].tolist().index('PlantType'), 'Value'])
|
|
|
assert plant_type == 0 or plant_type == 1
|
|
|
# 根据电站类型加载数据
|
|
|
-
|
|
|
- if self.opt.switch_nwp_owner:
|
|
|
- nwp_v, nwp_v_h = nwp_own, nwp_own_h
|
|
|
# 如果是风电
|
|
|
if plant_type == 0:
|
|
|
- station_info = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_w']), sep=r'\s+', header=0)
|
|
|
- station_info_d = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_d_w']), sep=r'\s+', header=0)
|
|
|
- nwp = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_w']), sep=r'\s+', header=0)
|
|
|
- nwp_h = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_w_h']), sep=r'\s+', header=0)
|
|
|
+ station_info = pd.read_csv(self.wrapper_path_cdq(area_id, self.opt.doc_mapping['station_info_w']), sep=r'\s+', header=0)
|
|
|
+ station_info_d = pd.read_csv(self.wrapper_path_cdq(area_id, self.opt.doc_mapping['station_info_d_w']), sep=r'\s+', header=0)
|
|
|
cap = float(station_info.loc[0, 'PlantCap'])
|
|
|
- if Path(self.wrapper_path(station_id, self.opt.doc_mapping['env_wf'])).exists():
|
|
|
- env = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['env_wf']), sep=r'\s+', header=0)
|
|
|
- else:
|
|
|
- env = None
|
|
|
+ # 去短期预测结果中加载当日短期
|
|
|
+ dq = self._load_dq_res(area_id)
|
|
|
+ cdq_his_rp = self._load_cdq_history_rp()
|
|
|
+
|
|
|
# 如果是光伏
|
|
|
else:
|
|
|
- station_info = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_s']), sep=r'\s+', header=0)
|
|
|
- station_info_d = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_d_s']), sep=r'\s+', header=0)
|
|
|
- nwp = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_s']), sep=r'\s+', header=0)
|
|
|
- nwp_h = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_s_h']), sep=r'\s+', header=0)
|
|
|
+ station_info = pd.read_csv(self.wrapper_path_cdq(area_id, self.opt.doc_mapping['station_info_s']), sep=r'\s+', header=0)
|
|
|
+ station_info_d = pd.read_csv(self.wrapper_path_cdq(area_id, self.opt.doc_mapping['station_info_d_s']), sep=r'\s+', header=0)
|
|
|
cap = float(station_info.loc[0, 'PlantCap'])
|
|
|
- if Path(self.wrapper_path(station_id, self.opt.doc_mapping['env_sf'])).exists():
|
|
|
- env = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['env_sf']), sep=r'\s+', header=0)
|
|
|
- else:
|
|
|
- env = None
|
|
|
+ # 去短期预测结果中加载当日短期
|
|
|
+ dq = self._load_dq_res(area_id)
|
|
|
+ cdq_his_rp = self._load_cdq_history_rp()
|
|
|
|
|
|
return types.SimpleNamespace(**{
|
|
|
'station_info': station_info,
|
|
|
'station_info_d': station_info_d,
|
|
|
- 'nwp': nwp,
|
|
|
- 'nwp_h': nwp_h,
|
|
|
- 'power': power,
|
|
|
- 'nwp_v': nwp_v,
|
|
|
- 'nwp_v_h': nwp_v_h,
|
|
|
- 'env': env,
|
|
|
+ 'dq': dq,
|
|
|
+ 'cdq_his_rp': cdq_his_rp,
|
|
|
'cap': cap
|
|
|
})
|
|
|
except Exception as e:
|
|
|
- print(f"Error loading {station_id}: {str(e)}")
|
|
|
+ print(f"CDQ Error loading {area_id}: {str(e)}")
|
|
|
return None
|
|
|
|
|
|
def get_material(self, station_id):
|