123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- # @FileName :material.py
- # @Time :2025/4/29 11:07
- # @Author :David
- # @Company: shenyang JY
- import os.path
- import types
- import pandas as pd
- from pathlib import Path
- from datetime import datetime, timedelta
- from app.common.config import logger, parser
- from concurrent.futures import ThreadPoolExecutor
- from functools import partial
- class MaterialLoader:
- def __init__(self, opt, lazy_load=True):
- self.lazy_load = lazy_load
- self._data_cache = {}
- self.opt = opt
- self.target_dir = os.path.join(opt.dqyc_base_path, opt.input_file)
- self.target_dir_cdq = os.path.join(opt.cdqyc_base_path, opt.moment)
- def wrapper_path(self, spec):
- return os.path.join(str(self.target_dir), f"{spec}.txt")
- def wrapper_path_cdq(self, spec):
- return os.path.join(str(self.target_dir_cdq), 'IN', f"{spec}.txt")
- def _load_cdq_history_rp(self, area_id, plant_type):
- """
- 加载超短期历史实发功率:加载当前时刻和前2个时刻的历史实际功率
- """
- dt = datetime.strptime(self.opt.moment, '%Y%m%d_%H%M')
- dts = [x.strftime("%Y%m%d_%H%M") for x in [dt, dt-timedelta(minutes=15), dt-timedelta(minutes=30)]]
- def wrapper_path_cdq_his(points):
- cdq_his = []
- for point in points:
- his_file = f'{self.opt.doc_cdq_mapping['history_power']}_{area_id}_{plant_type}_{point}.txt'
- cdq_his.append(os.path.join(self.opt.cdqyc_base_path, his_file))
- return cdq_his
- cdq_his_rp = []
- for dt_text, his_file in zip(dts, wrapper_path_cdq_his(dts)):
- dt = datetime.strptime(dt_text, '%Y%m%d_%H%M').strftime("%Y-%m-%d %H:%M:%S")
- cdq_his_df = pd.read_csv(his_file, sep=r'\s+', header=0)
- cdq_his_df['moment'] = dt
- cdq_his_rp.append(cdq_his_df)
- cdq_his_rp = pd.concat(cdq_his_rp, axis=0)
- return cdq_his_rp
- def _load_dq_res(self, area_id, plant_type, begin_time):
- """
- 加载短期预测结果:场站级短期和区域级短期
- """
- task_id = str(self.opt.dq_area_task_mapping[int(area_id)][int(plant_type)])
- # 找时间:超短期时间目录时间(当前时间),超短期预测起始时间,减去一天,是短期时间目录的时间(当前时间)
- dt_str = (datetime.strptime(begin_time, '%Y-%m-%d %H:%M:%S') - timedelta(days=1)).strftime("%Y-%m-%d")
- dq_path = os.path.join(str(self.opt.dqyc_base_path), task_id, area_id, dt_str, 'IN')
- dq_path_out = os.path.join(str(self.opt.dqyc_base_path), task_id, area_id, dt_str, 'OUT')
- all_stations = [str(child.parts[-1]) for child in Path(str(dq_path)).iterdir() if child.is_dir()]
- dqs = [pd.read_csv(os.path.join(str(dq_path), station, self.opt.doc_mapping['out']+'.txt'), sep=r'\s+', header=0) for station in all_stations]
- dq = pd.concat(dqs)
- dq_area = pd.read_csv(os.path.join(str(dq_path_out), self.opt.doc_mapping['area_out']+'.txt'), sep=r'\s+', header=0)
- return dq, dq_area
- def _load_material(self, station_id):
- """核心数据加载方法"""
- # 根据您的原始代码逻辑简化的加载流程
- try:
- basic = pd.read_csv(self.wrapper_path(self.opt.doc_mapping['basic']), sep=r'\s+', header=0)
- power = pd.read_csv(self.wrapper_path(self.opt.doc_mapping['power']), sep=r'\s+', header=0)
- plant_type = int(basic.loc[basic['PropertyID'].tolist().index('PlantType'), 'Value'])
- assert plant_type == 0 or plant_type == 1
- # 根据电站类型加载数据
- nwp_v = pd.read_csv(self.wrapper_path(f"0/{self.opt.doc_mapping['nwp_v']}"), sep=r'\s+', header=0)
- nwp_v_h = pd.read_csv(self.wrapper_path(f"0/{self.opt.doc_mapping['nwp_v_h']}"), sep=r'\s+', header=0)
- nwp_own = pd.read_csv(self.wrapper_path(f"1/{self.opt.doc_mapping['nwp_own']}"), sep=r'\s+', header=0)
- nwp_own_h = pd.read_csv(self.wrapper_path(f"1/{self.opt.doc_mapping['nwp_own_h']}"), sep=r'\s+', header=0)
- if self.opt.switch_nwp_owner:
- nwp_v, nwp_v_h = nwp_own, nwp_own_h
- # 如果是风电
- if plant_type == 0:
- station_info = pd.read_csv(self.wrapper_path(self.opt.doc_mapping['station_info_w']), sep=r'\s+', header=0)
- station_info_d = pd.read_csv(self.wrapper_path(self.opt.doc_mapping['station_info_d_w']), sep=r'\s+', header=0)
- nwp = pd.read_csv(self.wrapper_path(self.opt.doc_mapping['nwp_w']), sep=r'\s+', header=0)
- nwp_h = pd.read_csv(self.wrapper_path(self.opt.doc_mapping['nwp_w_h']), sep=r'\s+', header=0)
- cap = float(station_info.loc[0, 'PlantCap'])
- if Path(self.wrapper_path(self.opt.doc_mapping['env_wf'])).exists():
- env = pd.read_csv(self.wrapper_path(self.opt.doc_mapping['env_wf']), sep=r'\s+', header=0)
- else:
- env = None
- # 如果是光伏
- else:
- station_info = pd.read_csv(self.wrapper_path(self.opt.doc_mapping['station_info_s']), sep=r'\s+', header=0)
- station_info_d = pd.read_csv(self.wrapper_path(self.opt.doc_mapping['station_info_d_s']), sep=r'\s+', header=0)
- nwp = pd.read_csv(self.wrapper_path(self.opt.doc_mapping['nwp_s']), sep=r'\s+', header=0)
- nwp_h = pd.read_csv(self.wrapper_path(self.opt.doc_mapping['nwp_s_h']), sep=r'\s+', header=0)
- cap = float(station_info.loc[0, 'PlantCap'])
- if Path(self.wrapper_path(self.opt.doc_mapping['env_sf'])).exists():
- env = pd.read_csv(self.wrapper_path(self.opt.doc_mapping['env_sf']), sep=r'\s+', header=0)
- else:
- env = None
- return types.SimpleNamespace(**{
- 'station_info': station_info,
- 'station_info_d': station_info_d,
- 'nwp': nwp,
- 'nwp_h': nwp_h,
- 'power': power,
- 'nwp_v': nwp_v,
- 'nwp_v_h': nwp_v_h,
- 'env': env,
- 'cap': cap
- })
- except Exception as e:
- print(f"Error loading {station_id}: {str(e)}")
- return None
- def get_material_cdq(self):
- """超短期核心数据加载方法"""
- # 根据您的原始代码逻辑简化的加载流程
- try:
- area_id, plant_type = self.opt.input_file.split('/')[0], self.opt.input_file.split('/')[1]
- self.target_dir_cdq = os.path.join(str(self.target_dir_cdq), area_id, plant_type)
- basic = pd.read_csv(self.wrapper_path_cdq(self.opt.doc_cdq_mapping['basic']), sep=r'\s+', header=0)
- basic_area = pd.read_csv(self.wrapper_path_cdq(self.opt.doc_cdq_mapping['basic_area']), sep=r'\s+', header=0)
- plant_type1 = int(basic.loc[basic['PropertyID'].tolist().index('PlantType'), 'Value'])
- begin_time = str(basic.loc[basic['PropertyID'].tolist().index('ForecastBeginTime'), 'Value'])
- end_time = str(basic.loc[basic['PropertyID'].tolist().index('ForecastEndTime'), 'Value'])
- # assert plant_type == 0 or plant_type == 1
- dq = 1
- # 根据电站类型加载数据
- # 如果是风电
- if int(plant_type) == 0:
- station_info = pd.read_csv(self.wrapper_path_cdq(self.opt.doc_cdq_mapping['station_info_w']), sep=r'\s+', header=0)
- station_info_d = pd.read_csv(self.wrapper_path_cdq(self.opt.doc_cdq_mapping['station_info_d_w']), sep=r'\s+', header=0)
- cap = float(station_info.loc[0, 'PlantCap'])
- # 去短期预测结果中加载当日短期
- dq, dq_area = self._load_dq_res(area_id, plant_type, begin_time)
- cdq_his_rp = self._load_cdq_history_rp(area_id, plant_type)
- # 如果是光伏
- else:
- station_info = pd.read_csv(self.wrapper_path_cdq(self.opt.doc_cdq_mapping['station_info_s']), sep=r'\s+', header=0)
- station_info_d = pd.read_csv(self.wrapper_path_cdq(self.opt.doc_cdq_mapping['station_info_d_s']), sep=r'\s+', header=0)
- cap = float(station_info.loc[0, 'PlantCap'])
- # 去短期预测结果中加载当日短期
- dq, dq_area = self._load_dq_res(area_id, plant_type, begin_time)
- cdq_his_rp = self._load_cdq_history_rp(area_id, plant_type)
- return types.SimpleNamespace(**{
- 'station_info': station_info,
- 'station_info_d': station_info_d,
- 'dq': dq,
- 'dq_area': dq_area,
- 'cdq_his_rp': cdq_his_rp,
- 'cap': cap,
- 'begin_time': begin_time,
- 'end_time': end_time
- })
- except Exception as e:
- print(f"CDQ Error loading: {str(e)}")
- return None
- def get_material(self, station_id):
- if self.lazy_load:
- if station_id not in self._data_cache:
- self.target_dir = os.path.join(str(self.target_dir), station_id)
- self._data_cache[station_id] = self._load_material(station_id)
- return self._data_cache[station_id]
- else:
- return self._load_material(station_id)
- def add_weights(self, data_objects):
- """对nwp数据进行cap加权(nwp, nwp_h, nwp_v_, nwp_v_h)"""
- def local_sum(df, weight):
- """内部函数:对DataFrame进行加权求和"""
- weighted_df = df.copy()
- columns_to_scale = [col for col in df.columns if col not in ['PlantID', 'PlantName', 'PlantType', 'Qbsj', 'Datetime']]
- weighted_df[columns_to_scale] = weighted_df[columns_to_scale] * weight
- return weighted_df, weight
- # 从data_objects解构对象
- nwp, nwp_h, nwp_v, nwp_v_h, power, cap = (
- data_objects.nwp,
- data_objects.nwp_h,
- data_objects.nwp_v,
- data_objects.nwp_v_h,
- data_objects.power,
- data_objects.cap
- )
- # 对每个NWP数据集进行容量加权
- weighted_nwp, cap = local_sum(nwp, cap)
- weighted_nwp_h, _ = local_sum(nwp_h, cap)
- weighted_nwp_v, _ = local_sum(nwp_v, cap)
- weighted_nwp_v_h, _ = local_sum(nwp_v_h, cap)
- return {
- 'nwp': weighted_nwp,
- 'nwp_h': weighted_nwp_h,
- 'nwp_v': weighted_nwp_v,
- 'nwp_v_h': weighted_nwp_v_h,
- 'cap': cap
- }
- def get_material_region(self):
- try:
- basic = pd.read_csv(os.path.join(str(self.target_dir), self.opt.doc_mapping['basic_area']+'.txt'), sep=r'\s+', header=0)
- power = pd.read_csv(os.path.join(str(self.target_dir), self.opt.doc_mapping['power_area']+'.txt'), sep=r'\s+', header=0)
- plant_type = int(basic.loc[basic['PropertyID'].tolist().index('PlantType'), 'Value'])
- area_id = int(basic.loc[basic['PropertyID'].tolist().index('AreaId'), 'Value'])
- assert plant_type == 0 or plant_type == 1
- area_cap = float(basic.loc[basic['PropertyID'].tolist().index('AreaCap'), 'Value'])
- return types.SimpleNamespace(**{
- 'power': power,
- 'area_cap': area_cap,
- 'area_id': area_id
- })
- except Exception as e:
- print(f"Region Error loading: {str(e)}")
- return None
- if __name__ == "__main__":
- run_code = 0
|