123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- # @FileName :material.py
- # @Time :2025/4/29 11:07
- # @Author :David
- # @Company: shenyang JY
- import os.path
- import types
- import pandas as pd
- from pathlib import Path
- from datetime import datetime, timedelta
- from app.common.config import logger, parser
- from concurrent.futures import ThreadPoolExecutor
- from functools import partial
- class MaterialLoader:
- def __init__(self, opt, lazy_load=True):
- self.lazy_load = lazy_load
- self._data_cache = {}
- self.opt = opt
- self.target_dir = os.path.join(opt.dqyc_base_path, opt.input_file)
- self.target_dir_cdq = os.path.join(opt.cdqyc_base_path, opt.moment)
- def wrapper_path(self, station_id, spec):
- return f"{self.target_dir/station_id/spec}.txt"
- def wrapper_path_cdq(self, area_id, spec):
- return f"{self.target_dir/area_id/spec}.txt"
- def _load_cdq_history_rp(self):
- """
- 加载超短期历史实发功率
- """
- dt = datetime.strptime(self.opt.moment, '%Y%m%d_%H%M')
- dts = [x.strftime("%Y%m%d_%H%M") for x in [dt, dt-timedelta(minutes=15), dt-timedelta(minutes=30)]]
- def wrapper_path_cdq_his(points):
- segs = self.opt.cdqyc_base_path.split('/')
- cdq_his = []
- for point in points:
- his_file = f'{self.opt.doc_cdq_mapping['history_power']}_{segs[0]}_{segs[1]}_{point}.txt'
- cdq_his.append(os.path.join(self.opt.cdqyc_base_path, his_file))
- return cdq_his
- cdq_his_rp = []
- for dt_text, his_file in zip(dts, wrapper_path_cdq_his(dts)):
- cdq_his_df = pd.read_csv(his_file)
- cdq_his_df['moment'] = dt_text
- cdq_his_rp.append(cdq_his_df)
- cdq_his_rp = pd.concat(cdq_his_rp, axis=0)
- return cdq_his_rp
- def _load_dq_res(self, area_id):
- """
- 加载短期预测结果
- """
- dt_str = datetime.strptime(self.opt.moment, '%Y%m%d_%H%M').strftime("%Y-%m-%d")
- dq_path = os.path.join(self.opt.dqyc_base_path, area_id, dt_str, 'IN')
- all_stations = [str(child.parts[-1]) for child in Path(str(dq_path)).iterdir() if child.is_dir()]
- dqs = [pd.read_csv(os.path.join(str(dq_path), station, self.opt.doc_mapping['out'])) for station in all_stations]
- dqs.append(pd.read_csv(os.path.join(str(dq_path), self.opt.doc_mapping['area_out'])))
- dq = pd.concat(dqs)
- return dq
- def _load_material(self, station_id):
- """核心数据加载方法"""
- # 根据您的原始代码逻辑简化的加载流程
- try:
- basic = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['basic']), sep=r'\s+', header=0)
- power = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['power']), sep=r'\s+', header=0)
- plant_type = int(basic.loc[basic['PropertyID'].tolist().index('PlantType'), 'Value'])
- assert plant_type == 0 or plant_type == 1
- # 根据电站类型加载数据
- nwp_v = pd.read_csv(self.wrapper_path(station_id, f"0/{self.opt.doc_mapping['nwp_v']}"), sep=r'\s+', header=0)
- nwp_v_h = pd.read_csv(self.wrapper_path(station_id, f"0/{self.opt.doc_mapping['nwp_v_h']}"), sep=r'\s+', header=0)
- nwp_own = pd.read_csv(self.wrapper_path(station_id, f"1/{self.opt.doc_mapping['nwp_own']}"), sep=r'\s+', header=0)
- nwp_own_h = pd.read_csv(self.wrapper_path(station_id, f"1/{self.opt.doc_mapping['nwp_own_h']}"), sep=r'\s+', header=0)
- if self.opt.switch_nwp_owner:
- nwp_v, nwp_v_h = nwp_own, nwp_own_h
- # 如果是风电
- if plant_type == 0:
- station_info = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_w']), sep=r'\s+', header=0)
- station_info_d = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_d_w']), sep=r'\s+', header=0)
- nwp = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_w']), sep=r'\s+', header=0)
- nwp_h = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_w_h']), sep=r'\s+', header=0)
- cap = float(station_info.loc[0, 'PlantCap'])
- if Path(self.wrapper_path(station_id, self.opt.doc_mapping['env_wf'])).exists():
- env = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['env_wf']), sep=r'\s+', header=0)
- else:
- env = None
- # 如果是光伏
- else:
- station_info = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_s']), sep=r'\s+', header=0)
- station_info_d = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_d_s']), sep=r'\s+', header=0)
- nwp = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_s']), sep=r'\s+', header=0)
- nwp_h = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_s_h']), sep=r'\s+', header=0)
- cap = float(station_info.loc[0, 'PlantCap'])
- if Path(self.wrapper_path(station_id, self.opt.doc_mapping['env_sf'])).exists():
- env = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['env_sf']), sep=r'\s+', header=0)
- else:
- env = None
- return types.SimpleNamespace(**{
- 'station_info': station_info,
- 'station_info_d': station_info_d,
- 'nwp': nwp,
- 'nwp_h': nwp_h,
- 'power': power,
- 'nwp_v': nwp_v,
- 'nwp_v_h': nwp_v_h,
- 'env': env,
- 'cap': cap
- })
- except Exception as e:
- print(f"Error loading {station_id}: {str(e)}")
- return None
- def _load_material_cdq(self, area_id):
- """超短期核心数据加载方法"""
- # 根据您的原始代码逻辑简化的加载流程
- try:
- basic = pd.read_csv(self.wrapper_path_cdq(area_id, self.opt.doc_cdq_mapping['basic']), sep=r'\s+', header=0)
- basic_area = pd.read_csv(self.wrapper_path_cdq(area_id, self.opt.doc_cdq_mapping['basic_area']), sep=r'\s+', header=0)
- plant_type = int(basic.loc[basic['PropertyID'].tolist().index('PlantType'), 'Value'])
- assert plant_type == 0 or plant_type == 1
- # 根据电站类型加载数据
- # 如果是风电
- if plant_type == 0:
- station_info = pd.read_csv(self.wrapper_path_cdq(area_id, self.opt.doc_mapping['station_info_w']), sep=r'\s+', header=0)
- station_info_d = pd.read_csv(self.wrapper_path_cdq(area_id, self.opt.doc_mapping['station_info_d_w']), sep=r'\s+', header=0)
- cap = float(station_info.loc[0, 'PlantCap'])
- # 去短期预测结果中加载当日短期
- dq = self._load_dq_res(area_id)
- cdq_his_rp = self._load_cdq_history_rp()
- # 如果是光伏
- else:
- station_info = pd.read_csv(self.wrapper_path_cdq(area_id, self.opt.doc_mapping['station_info_s']), sep=r'\s+', header=0)
- station_info_d = pd.read_csv(self.wrapper_path_cdq(area_id, self.opt.doc_mapping['station_info_d_s']), sep=r'\s+', header=0)
- cap = float(station_info.loc[0, 'PlantCap'])
- # 去短期预测结果中加载当日短期
- dq = self._load_dq_res(area_id)
- cdq_his_rp = self._load_cdq_history_rp()
- return types.SimpleNamespace(**{
- 'station_info': station_info,
- 'station_info_d': station_info_d,
- 'dq': dq,
- 'cdq_his_rp': cdq_his_rp,
- 'cap': cap
- })
- except Exception as e:
- print(f"CDQ Error loading {area_id}: {str(e)}")
- return None
- def get_material(self, station_id):
- if self.lazy_load:
- if station_id not in self._data_cache:
- self._data_cache[station_id] = self._load_material(station_id)
- return self._data_cache[station_id]
- else:
- return self._load_material(station_id)
- def add_weights(self, data_objects):
- """对nwp数据进行cap加权(nwp, nwp_h, nwp_v_, nwp_v_h)"""
- def local_sum(df, weight):
- """内部函数:对DataFrame进行加权求和"""
- weighted_df = df.copy()
- columns_to_scale = [col for col in df.columns if col not in ['PlantID', 'PlantName', 'PlantType', 'Qbsj', 'Datetime']]
- weighted_df[columns_to_scale] = weighted_df[columns_to_scale] * weight
- return weighted_df, weight
- # 从data_objects解构对象
- nwp, nwp_h, nwp_v, nwp_v_h, power, cap = (
- data_objects.nwp,
- data_objects.nwp_h,
- data_objects.nwp_v,
- data_objects.nwp_v_h,
- data_objects.power,
- data_objects.cap
- )
- # 对每个NWP数据集进行容量加权
- weighted_nwp, cap = local_sum(nwp, cap)
- weighted_nwp_h, _ = local_sum(nwp_h, cap)
- weighted_nwp_v, _ = local_sum(nwp_v, cap)
- weighted_nwp_v_h, _ = local_sum(nwp_v_h, cap)
- return {
- 'nwp': weighted_nwp,
- 'nwp_h': weighted_nwp_h,
- 'nwp_v': weighted_nwp_v,
- 'nwp_v_h': weighted_nwp_v_h,
- 'cap': cap
- }
- def get_material_region(self):
- try:
- basic = pd.read_csv(os.path.join(str(self.target_dir), self.opt.doc_mapping['basic_area']+'.txt'), sep=r'\s+', header=0)
- power = pd.read_csv(os.path.join(str(self.target_dir), self.opt.doc_mapping['power_area']+'.txt'), sep=r'\s+', header=0)
- plant_type = int(basic.loc[basic['PropertyID'].tolist().index('PlantType'), 'Value'])
- area_id = int(basic.loc[basic['PropertyID'].tolist().index('AreaId'), 'Value'])
- assert plant_type == 0 or plant_type == 1
- area_cap = float(basic.loc[basic['PropertyID'].tolist().index('AreaCap'), 'Value'])
- return types.SimpleNamespace(**{
- 'power': power,
- 'area_cap': area_cap,
- 'area_id': area_id
- })
- except Exception as e:
- print(f"Region Error loading: {str(e)}")
- return None
- def get_material_cdq(self):
- pass
- if __name__ == "__main__":
- run_code = 0
|