#!/usr/bin/env python # -*- coding:utf-8 -*- # @FileName :material.py # @Time :2025/4/29 11:07 # @Author :David # @Company: shenyang JY import os.path import types import pandas as pd from pathlib import Path from datetime import datetime, timedelta from app.common.config import logger, parser from concurrent.futures import ThreadPoolExecutor from functools import partial class MaterialLoader: def __init__(self, opt, lazy_load=True): self.lazy_load = lazy_load self._data_cache = {} self.opt = opt self.target_dir = os.path.join(opt.dqyc_base_path, opt.input_file) self.target_dir_cdq = os.path.join(opt.cdqyc_base_path, opt.moment) def wrapper_path(self, station_id, spec): return f"{self.target_dir/station_id/spec}.txt" def wrapper_path_cdq(self, area_id, spec): return f"{self.target_dir/area_id/spec}.txt" def _load_cdq_history_rp(self): """ 加载超短期历史实发功率 """ dt = datetime.strptime(self.opt.moment, '%Y%m%d_%H%M') dts = [x.strftime("%Y%m%d_%H%M") for x in [dt, dt-timedelta(minutes=15), dt-timedelta(minutes=30)]] def wrapper_path_cdq_his(points): segs = self.opt.cdqyc_base_path.split('/') cdq_his = [] for point in points: his_file = f'{self.opt.doc_cdq_mapping['history_power']}_{segs[0]}_{segs[1]}_{point}.txt' cdq_his.append(os.path.join(self.opt.cdqyc_base_path, his_file)) return cdq_his cdq_his_rp = [] for dt_text, his_file in zip(dts, wrapper_path_cdq_his(dts)): cdq_his_df = pd.read_csv(his_file) cdq_his_df['moment'] = dt_text cdq_his_rp.append(cdq_his_df) cdq_his_rp = pd.concat(cdq_his_rp, axis=0) return cdq_his_rp def _load_dq_res(self, area_id): """ 加载短期预测结果 """ dt_str = datetime.strptime(self.opt.moment, '%Y%m%d_%H%M').strftime("%Y-%m-%d") dq_path = os.path.join(self.opt.dqyc_base_path, area_id, dt_str, 'IN') all_stations = [str(child.parts[-1]) for child in Path(str(dq_path)).iterdir() if child.is_dir()] dqs = [pd.read_csv(os.path.join(str(dq_path), station, self.opt.doc_mapping['out'])) for station in all_stations] dqs.append(pd.read_csv(os.path.join(str(dq_path), self.opt.doc_mapping['area_out']))) dq = pd.concat(dqs) return dq def _load_material(self, station_id): """核心数据加载方法""" # 根据您的原始代码逻辑简化的加载流程 try: basic = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['basic']), sep=r'\s+', header=0) power = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['power']), sep=r'\s+', header=0) plant_type = int(basic.loc[basic['PropertyID'].tolist().index('PlantType'), 'Value']) assert plant_type == 0 or plant_type == 1 # 根据电站类型加载数据 nwp_v = pd.read_csv(self.wrapper_path(station_id, f"0/{self.opt.doc_mapping['nwp_v']}"), sep=r'\s+', header=0) nwp_v_h = pd.read_csv(self.wrapper_path(station_id, f"0/{self.opt.doc_mapping['nwp_v_h']}"), sep=r'\s+', header=0) nwp_own = pd.read_csv(self.wrapper_path(station_id, f"1/{self.opt.doc_mapping['nwp_own']}"), sep=r'\s+', header=0) nwp_own_h = pd.read_csv(self.wrapper_path(station_id, f"1/{self.opt.doc_mapping['nwp_own_h']}"), sep=r'\s+', header=0) if self.opt.switch_nwp_owner: nwp_v, nwp_v_h = nwp_own, nwp_own_h # 如果是风电 if plant_type == 0: station_info = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_w']), sep=r'\s+', header=0) station_info_d = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_d_w']), sep=r'\s+', header=0) nwp = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_w']), sep=r'\s+', header=0) nwp_h = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_w_h']), sep=r'\s+', header=0) cap = float(station_info.loc[0, 'PlantCap']) if Path(self.wrapper_path(station_id, self.opt.doc_mapping['env_wf'])).exists(): env = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['env_wf']), sep=r'\s+', header=0) else: env = None # 如果是光伏 else: station_info = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_s']), sep=r'\s+', header=0) station_info_d = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_d_s']), sep=r'\s+', header=0) nwp = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_s']), sep=r'\s+', header=0) nwp_h = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_s_h']), sep=r'\s+', header=0) cap = float(station_info.loc[0, 'PlantCap']) if Path(self.wrapper_path(station_id, self.opt.doc_mapping['env_sf'])).exists(): env = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['env_sf']), sep=r'\s+', header=0) else: env = None return types.SimpleNamespace(**{ 'station_info': station_info, 'station_info_d': station_info_d, 'nwp': nwp, 'nwp_h': nwp_h, 'power': power, 'nwp_v': nwp_v, 'nwp_v_h': nwp_v_h, 'env': env, 'cap': cap }) except Exception as e: print(f"Error loading {station_id}: {str(e)}") return None def _load_material_cdq(self, area_id): """超短期核心数据加载方法""" # 根据您的原始代码逻辑简化的加载流程 try: basic = pd.read_csv(self.wrapper_path_cdq(area_id, self.opt.doc_cdq_mapping['basic']), sep=r'\s+', header=0) basic_area = pd.read_csv(self.wrapper_path_cdq(area_id, self.opt.doc_cdq_mapping['basic_area']), sep=r'\s+', header=0) plant_type = int(basic.loc[basic['PropertyID'].tolist().index('PlantType'), 'Value']) assert plant_type == 0 or plant_type == 1 # 根据电站类型加载数据 # 如果是风电 if plant_type == 0: station_info = pd.read_csv(self.wrapper_path_cdq(area_id, self.opt.doc_mapping['station_info_w']), sep=r'\s+', header=0) station_info_d = pd.read_csv(self.wrapper_path_cdq(area_id, self.opt.doc_mapping['station_info_d_w']), sep=r'\s+', header=0) cap = float(station_info.loc[0, 'PlantCap']) # 去短期预测结果中加载当日短期 dq = self._load_dq_res(area_id) cdq_his_rp = self._load_cdq_history_rp() # 如果是光伏 else: station_info = pd.read_csv(self.wrapper_path_cdq(area_id, self.opt.doc_mapping['station_info_s']), sep=r'\s+', header=0) station_info_d = pd.read_csv(self.wrapper_path_cdq(area_id, self.opt.doc_mapping['station_info_d_s']), sep=r'\s+', header=0) cap = float(station_info.loc[0, 'PlantCap']) # 去短期预测结果中加载当日短期 dq = self._load_dq_res(area_id) cdq_his_rp = self._load_cdq_history_rp() return types.SimpleNamespace(**{ 'station_info': station_info, 'station_info_d': station_info_d, 'dq': dq, 'cdq_his_rp': cdq_his_rp, 'cap': cap }) except Exception as e: print(f"CDQ Error loading {area_id}: {str(e)}") return None def get_material(self, station_id): if self.lazy_load: if station_id not in self._data_cache: self._data_cache[station_id] = self._load_material(station_id) return self._data_cache[station_id] else: return self._load_material(station_id) def add_weights(self, data_objects): """对nwp数据进行cap加权(nwp, nwp_h, nwp_v_, nwp_v_h)""" def local_sum(df, weight): """内部函数:对DataFrame进行加权求和""" weighted_df = df.copy() columns_to_scale = [col for col in df.columns if col not in ['PlantID', 'PlantName', 'PlantType', 'Qbsj', 'Datetime']] weighted_df[columns_to_scale] = weighted_df[columns_to_scale] * weight return weighted_df, weight # 从data_objects解构对象 nwp, nwp_h, nwp_v, nwp_v_h, power, cap = ( data_objects.nwp, data_objects.nwp_h, data_objects.nwp_v, data_objects.nwp_v_h, data_objects.power, data_objects.cap ) # 对每个NWP数据集进行容量加权 weighted_nwp, cap = local_sum(nwp, cap) weighted_nwp_h, _ = local_sum(nwp_h, cap) weighted_nwp_v, _ = local_sum(nwp_v, cap) weighted_nwp_v_h, _ = local_sum(nwp_v_h, cap) return { 'nwp': weighted_nwp, 'nwp_h': weighted_nwp_h, 'nwp_v': weighted_nwp_v, 'nwp_v_h': weighted_nwp_v_h, 'cap': cap } def get_material_region(self): try: basic = pd.read_csv(os.path.join(str(self.target_dir), self.opt.doc_mapping['basic_area']+'.txt'), sep=r'\s+', header=0) power = pd.read_csv(os.path.join(str(self.target_dir), self.opt.doc_mapping['power_area']+'.txt'), sep=r'\s+', header=0) plant_type = int(basic.loc[basic['PropertyID'].tolist().index('PlantType'), 'Value']) area_id = int(basic.loc[basic['PropertyID'].tolist().index('AreaId'), 'Value']) assert plant_type == 0 or plant_type == 1 area_cap = float(basic.loc[basic['PropertyID'].tolist().index('AreaCap'), 'Value']) return types.SimpleNamespace(**{ 'power': power, 'area_cap': area_cap, 'area_id': area_id }) except Exception as e: print(f"Region Error loading: {str(e)}") return None def get_material_cdq(self): pass if __name__ == "__main__": run_code = 0