123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- # @FileName :material.py
- # @Time :2025/4/29 11:07
- # @Author :David
- # @Company: shenyang JY
- import os.path
- import types
- import pandas as pd
- from pathlib import Path
- from app.common.config import logger, parser
- from concurrent.futures import ThreadPoolExecutor
- from functools import partial
- class MaterialLoader:
- def __init__(self, base_path, lazy_load=True):
- self.base_path = Path(base_path)
- self.lazy_load = lazy_load
- self._data_cache = {}
- self.opt = parser.parse_args_and_yaml()
- def wrapper_path(self, station_id, spec):
- return f"{self.base_path/station_id/spec}.txt"
- def _load_material(self, station_id):
- """核心数据加载方法"""
- # 根据您的原始代码逻辑简化的加载流程
- try:
- basic = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['basic']), sep=r'\s+', header=0)
- power = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['power']), sep=r'\s+', header=0)
- plant_type = int(basic.loc[basic['PropertyID'].tolist().index('PlantType'), 'Value'])
- assert plant_type == 0 or plant_type == 1
- # 根据电站类型加载数据
- nwp_v = pd.read_csv(self.wrapper_path(station_id, f"0/{self.opt.doc_mapping['nwp_v']}"), sep=r'\s+', header=0)
- nwp_v_h = pd.read_csv(self.wrapper_path(station_id, f"0/{self.opt.doc_mapping['nwp_v_h']}"), sep=r'\s+', header=0)
- nwp_own = pd.read_csv(self.wrapper_path(station_id, f"1/{self.opt.doc_mapping['nwp_own']}"), sep=r'\s+', header=0)
- nwp_own_h = pd.read_csv(self.wrapper_path(station_id, f"1/{self.opt.doc_mapping['nwp_own_h']}"), sep=r'\s+', header=0)
- if self.opt.switch_nwp_owner:
- nwp_v, nwp_v_h = nwp_own, nwp_own_h
- # 如果是风电
- if plant_type == 0:
- station_info = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_w']), sep=r'\s+', header=0)
- station_info_d = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_d_w']), sep=r'\s+', header=0)
- nwp = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_w']), sep=r'\s+', header=0)
- nwp_h = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_w_h']), sep=r'\s+', header=0)
- cap = float(station_info.loc[0, 'PlantCap'])
- if Path(self.wrapper_path(station_id, self.opt.doc_mapping['env_wf'])).exists():
- env = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['env_wf']), sep=r'\s+', header=0)
- else:
- env = None
- # 如果是光伏
- else:
- station_info = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_s']), sep=r'\s+', header=0)
- station_info_d = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_d_s']), sep=r'\s+', header=0)
- nwp = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_s']), sep=r'\s+', header=0)
- nwp_h = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_s_h']), sep=r'\s+', header=0)
- cap = float(station_info.loc[0, 'PlantCap'])
- if Path(self.wrapper_path(station_id, self.opt.doc_mapping['env_sf'])).exists():
- env = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['env_sf']), sep=r'\s+', header=0)
- else:
- env = None
- return types.SimpleNamespace(**{
- 'station_info': station_info,
- 'station_info_d': station_info_d,
- 'nwp': nwp,
- 'nwp_h': nwp_h,
- 'power': power,
- 'nwp_v': nwp_v,
- 'nwp_v_h': nwp_v_h,
- 'env': env,
- 'cap': cap
- })
- except Exception as e:
- print(f"Error loading {station_id}: {str(e)}")
- return None
- def get_material(self, station_id):
- if self.lazy_load:
- if station_id not in self._data_cache:
- self._data_cache[station_id] = self._load_material(station_id)
- return self._data_cache[station_id]
- else:
- return self._load_material(station_id)
- def add_weights(self, data_objects):
- """对nwp数据进行cap加权(nwp, nwp_h, nwp_v_, nwp_v_h)"""
- def local_sum(df, weight):
- """内部函数:对DataFrame进行加权求和"""
- weighted_df = df.copy()
- columns_to_scale = [col for col in df.columns if col not in ['PlantID', 'PlantName', 'PlantType', 'Qbsj', 'Datetime']]
- weighted_df[columns_to_scale] = weighted_df[columns_to_scale] * weight
- return weighted_df, weight
- # 从data_objects解构对象
- nwp, nwp_h, nwp_v, nwp_v_h, power, cap = (
- data_objects.nwp,
- data_objects.nwp_h,
- data_objects.nwp_v,
- data_objects.nwp_v_h,
- data_objects.power,
- data_objects.cap
- )
- # 对每个NWP数据集进行容量加权
- weighted_nwp, cap = local_sum(nwp, cap)
- weighted_nwp_h, _ = local_sum(nwp_h, cap)
- weighted_nwp_v, _ = local_sum(nwp_v, cap)
- weighted_nwp_v_h, _ = local_sum(nwp_v_h, cap)
- return {
- 'nwp': weighted_nwp,
- 'nwp_h': weighted_nwp_h,
- 'nwp_v': weighted_nwp_v,
- 'nwp_v_h': weighted_nwp_v_h,
- 'cap': cap
- }
- def get_material_region(self):
- try:
- basic = pd.read_csv(os.path.join(self.base_path, self.opt.doc_area_mapping['basic']+'.txt'), sep=r'\s+', header=0)
- power = pd.read_csv(os.path.join(self.base_path, self.opt.doc_area_mapping['power']+'.txt'), sep=r'\s+', header=0)
- plant_type = int(basic.loc[basic['PropertyID'].tolist().index('PlantType'), 'Value'])
- area_id = int(basic.loc[basic['PropertyID'].tolist().index('AreaId'), 'Value'])
- assert plant_type == 0 or plant_type == 1
- area_cap = float(basic.loc[basic['PropertyID'].tolist().index('AreaCap'), 'Value'])
- return types.SimpleNamespace(**{
- 'power': power,
- 'area_cap': area_cap,
- 'area_id': area_id
- })
- except Exception as e:
- print(f"Region Error loading: {str(e)}")
- return None
- if __name__ == "__main__":
- run_code = 0
|