#!/usr/bin/env python # -*- coding:utf-8 -*- # @FileName :material.py # @Time :2025/4/29 11:07 # @Author :David # @Company: shenyang JY import os.path import types import pandas as pd from pathlib import Path from app.common.config import logger, parser from concurrent.futures import ThreadPoolExecutor from functools import partial class MaterialLoader: def __init__(self, base_path, lazy_load=True): self.base_path = Path(base_path) self.lazy_load = lazy_load self._data_cache = {} self.opt = parser.parse_args_and_yaml() def wrapper_path(self, station_id, spec): return f"{self.base_path/station_id/spec}.txt" def _load_material(self, station_id): """核心数据加载方法""" # 根据您的原始代码逻辑简化的加载流程 try: basic = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['basic']), sep=r'\s+', header=0) power = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['power']), sep=r'\s+', header=0) plant_type = int(basic.loc[basic['PropertyID'].tolist().index('PlantType'), 'Value']) assert plant_type == 0 or plant_type == 1 # 根据电站类型加载数据 nwp_v = pd.read_csv(self.wrapper_path(station_id, f"0/{self.opt.doc_mapping['nwp_v']}"), sep=r'\s+', header=0) nwp_v_h = pd.read_csv(self.wrapper_path(station_id, f"0/{self.opt.doc_mapping['nwp_v_h']}"), sep=r'\s+', header=0) nwp_own = pd.read_csv(self.wrapper_path(station_id, f"1/{self.opt.doc_mapping['nwp_own']}"), sep=r'\s+', header=0) nwp_own_h = pd.read_csv(self.wrapper_path(station_id, f"1/{self.opt.doc_mapping['nwp_own_h']}"), sep=r'\s+', header=0) if self.opt.switch_nwp_owner: nwp_v, nwp_v_h = nwp_own, nwp_own_h # 如果是风电 if plant_type == 0: station_info = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_w']), sep=r'\s+', header=0) station_info_d = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_d_w']), sep=r'\s+', header=0) nwp = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_w']), sep=r'\s+', header=0) nwp_h = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_w_h']), sep=r'\s+', header=0) cap = float(station_info.loc[0, 'PlantCap']) if Path(self.wrapper_path(station_id, self.opt.doc_mapping['env_wf'])).exists(): env = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['env_wf']), sep=r'\s+', header=0) else: env = None # 如果是光伏 else: station_info = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_s']), sep=r'\s+', header=0) station_info_d = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_d_s']), sep=r'\s+', header=0) nwp = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_s']), sep=r'\s+', header=0) nwp_h = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_s_h']), sep=r'\s+', header=0) cap = float(station_info.loc[0, 'PlantCap']) if Path(self.wrapper_path(station_id, self.opt.doc_mapping['env_sf'])).exists(): env = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['env_sf']), sep=r'\s+', header=0) else: env = None return types.SimpleNamespace(**{ 'station_info': station_info, 'station_info_d': station_info_d, 'nwp': nwp, 'nwp_h': nwp_h, 'power': power, 'nwp_v': nwp_v, 'nwp_v_h': nwp_v_h, 'env': env, 'cap': cap }) except Exception as e: print(f"Error loading {station_id}: {str(e)}") return None def get_material(self, station_id): if self.lazy_load: if station_id not in self._data_cache: self._data_cache[station_id] = self._load_material(station_id) return self._data_cache[station_id] else: return self._load_material(station_id) def add_weights(self, data_objects): """对nwp数据进行cap加权(nwp, nwp_h, nwp_v_, nwp_v_h)""" def local_sum(df, weight): """内部函数:对DataFrame进行加权求和""" weighted_df = df.copy() columns_to_scale = [col for col in df.columns if col not in ['PlantID', 'PlantName', 'PlantType', 'Qbsj', 'Datetime']] weighted_df[columns_to_scale] = weighted_df[columns_to_scale] * weight return weighted_df, weight # 从data_objects解构对象 nwp, nwp_h, nwp_v, nwp_v_h, power, cap = ( data_objects.nwp, data_objects.nwp_h, data_objects.nwp_v, data_objects.nwp_v_h, data_objects.power, data_objects.cap ) # 对每个NWP数据集进行容量加权 weighted_nwp, cap = local_sum(nwp, cap) weighted_nwp_h, _ = local_sum(nwp_h, cap) weighted_nwp_v, _ = local_sum(nwp_v, cap) weighted_nwp_v_h, _ = local_sum(nwp_v_h, cap) return { 'nwp': weighted_nwp, 'nwp_h': weighted_nwp_h, 'nwp_v': weighted_nwp_v, 'nwp_v_h': weighted_nwp_v_h, 'cap': cap } def get_material_region(self): try: basic = pd.read_csv(os.path.join(self.base_path, self.opt.doc_area_mapping['basic']+'.txt'), sep=r'\s+', header=0) power = pd.read_csv(os.path.join(self.base_path, self.opt.doc_area_mapping['power']+'.txt'), sep=r'\s+', header=0) plant_type = int(basic.loc[basic['PropertyID'].tolist().index('PlantType'), 'Value']) area_id = int(basic.loc[basic['PropertyID'].tolist().index('AreaId'), 'Value']) assert plant_type == 0 or plant_type == 1 area_cap = float(basic.loc[basic['PropertyID'].tolist().index('AreaCap'), 'Value']) return types.SimpleNamespace(**{ 'power': power, 'area_cap': area_cap, 'area_id': area_id }) except Exception as e: print(f"Region Error loading: {str(e)}") return None if __name__ == "__main__": run_code = 0