123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- # @FileName :material.py
- # @Time :2025/4/29 11:07
- # @Author :David
- # @Company: shenyang JY
- import os.path
- import types
- import pandas as pd
- from pathlib import Path
- from app.common.config import logger, parser
- from concurrent.futures import ThreadPoolExecutor
- from functools import partial
- class MaterialLoader:
- def __init__(self, base_path, lazy_load=True):
- self.base_path = Path(base_path)
- self.lazy_load = lazy_load
- self._data_cache = {}
- self.opt = parser.parse_args_and_yaml()
- self.sum_cap = 0
- self.weighted_nwp = pd.DataFrame()
- self.weighted_nwp_h = pd.DataFrame()
- self.weighted_nwp_v = pd.DataFrame()
- self.weighted_nwp_v_h = pd.DataFrame()
- def wrapper_path(self, station_id, spec):
- return f"{self.base_path/station_id/spec}.txt"
- def _load_material(self, station_id):
- """核心数据加载方法"""
- # 根据您的原始代码逻辑简化的加载流程
- try:
- basic = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['basic']), sep=r'\s+', header=0)
- power = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['power']), sep=r'\s+', header=0)
- plant_type = int(basic.loc[basic['PropertyID'].tolist().index('PlantType'), 'Value'])
- assert plant_type == 0 or plant_type == 1
- # 根据电站类型加载数据
- nwp_v = pd.read_csv(self.wrapper_path(station_id, f"0/{self.opt.doc_mapping['nwp_v']}"), sep=r'\s+', header=0)
- nwp_v_h = pd.read_csv(self.wrapper_path(station_id, f"0/{self.opt.doc_mapping['nwp_v_h']}"), sep=r'\s+', header=0)
- nwp_own = pd.read_csv(self.wrapper_path(station_id, f"1/{self.opt.doc_mapping['nwp_own']}"), sep=r'\s+', header=0)
- nwp_own_h = pd.read_csv(self.wrapper_path(station_id, f"1/{self.opt.doc_mapping['nwp_own_h']}"), sep=r'\s+', header=0)
- if self.opt.switch_nwp_owner:
- nwp_v, nwp_v_h = nwp_own, nwp_own_h
- # 如果是风电
- if plant_type == 0:
- station_info = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_w']), sep=r'\s+', header=0)
- station_info_d = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_d_w']), sep=r'\s+', header=0)
- nwp = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_w']), sep=r'\s+', header=0)
- nwp_h = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_w_h']), sep=r'\s+', header=0)
- cap = float(station_info.loc[0, 'PlantCap'])
- if Path(self.wrapper_path(station_id, self.opt.doc_mapping['env_wf'])).exists():
- env = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['env_wf']), sep=r'\s+', header=0)
- else:
- env = None
- # 如果是光伏
- else:
- station_info = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_s']), sep=r'\s+', header=0)
- station_info_d = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_d_s']), sep=r'\s+', header=0)
- nwp = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_s']), sep=r'\s+', header=0)
- nwp_h = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_s_h']), sep=r'\s+', header=0)
- cap = float(station_info.loc[0, 'PlantCap'])
- if Path(self.wrapper_path(station_id, self.opt.doc_mapping['env_sf'])).exists():
- env = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['env_sf']), sep=r'\s+', header=0)
- else:
- env = None
- return types.SimpleNamespace(**{
- 'station_info': station_info,
- 'station_info_d': station_info_d,
- 'nwp': nwp,
- 'nwp_h': nwp_h,
- 'power': power,
- 'nwp_v': nwp_v,
- 'nwp_v_h': nwp_v_h,
- 'env': env,
- 'cap': cap
- })
- except Exception as e:
- print(f"Error loading {station_id}: {str(e)}")
- return None
- def get_material(self, station_id):
- if self.lazy_load:
- if station_id not in self._data_cache:
- self._data_cache[station_id] = self._load_material(station_id)
- self.add_weights(self._data_cache[station_id])
- return self._data_cache[station_id]
- else:
- return self._load_material(station_id)
- def add_weights(self, data_objects):
- """对nwp数据进行cap加权(nwp, nwp_h, nwp_v_, nwp_v_h)"""
- def sum_df(df_obj, df, weight):
- """内部函数:对DataFrame进行加权求和"""
- columns_to_scale = [col for col in df.columns if col not in ['PlantID', 'PlantName', 'Datetime']]
- if not df_obj.empty:
- # 验证列名一致性
- assert set(df_obj.columns) == set(df.columns), "DataFrame列不匹配"
- # 向量化操作:仅对数值列进行加权累加
- df_obj[columns_to_scale] += df[columns_to_scale] * weight
- else:
- # 初始化操作:复制结构并加权数值列
- df_obj = df.copy()
- df_obj[columns_to_scale] = df[columns_to_scale] * weight
- return df_obj
- # 从data_objects解构对象
- nwp, nwp_h, nwp_v, nwp_v_h, power, cap = (
- data_objects.nwp,
- data_objects.nwp_h,
- data_objects.nwp_v,
- data_objects.nwp_v_h,
- data_objects.power,
- data_objects.cap
- )
- # 累加总容量(用于后续归一化)
- self.sum_cap += cap
- # 对每个NWP数据集进行容量加权
- self.weighted_nwp = sum_df(self.weighted_nwp, nwp, cap)
- self.weighted_nwp_h = sum_df(self.weighted_nwp_h, nwp_h, cap)
- self.weighted_nwp_v = sum_df(self.weighted_nwp_v, nwp_v, cap)
- self.weighted_nwp_v_h = sum_df(self.weighted_nwp_v_h, nwp_v_h, cap)
- def get_material_region(self):
- try:
- basic = pd.read_csv(os.path.join(self.base_path, self.opt.doc_area_mapping['basic']), sep=r'\s+', header=0)
- power = pd.read_csv(os.path.join(self.base_path, self.opt.doc_area_mapping['power']), sep=r'\s+', header=0)
- plant_type = int(basic.loc[basic['PropertyID'].tolist().index('PlantType'), 'Value'])
- area_id = int(basic.loc[basic['PropertyID'].tolist().index('AreaId'), 'Value'])
- assert plant_type == 0 or plant_type == 1
- area_cap = float(basic.loc[basic['PropertyID'].tolist().index('AreaCap'), 'Value'])
- columns_to_scale = [col for col in self.weighted_nwp.columns if col not in ['PlantID', 'PlantName', 'Datetime']]
- self.weighted_nwp[columns_to_scale] /= self.sum_cap
- return types.SimpleNamespace(**{
- 'nwp': self.weighted_nwp,
- 'nwp_h': self.weighted_nwp_h,
- 'power': power,
- 'nwp_v': self.weighted_nwp_v,
- 'nwp_v_h': self.weighted_nwp_v_h,
- 'area_cap': area_cap
- })
- except Exception as e:
- print(f"Region Error loading: {str(e)}")
- return None
- if __name__ == "__main__":
- run_code = 0
|