material.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # @FileName :material.py
  4. # @Time :2025/4/29 11:07
  5. # @Author :David
  6. # @Company: shenyang JY
  7. import os.path
  8. import types
  9. import pandas as pd
  10. from pathlib import Path
  11. from app.common.config import logger, parser
  12. from concurrent.futures import ThreadPoolExecutor
  13. from functools import partial
  14. class MaterialLoader:
  15. def __init__(self, target_dir, lazy_load=True):
  16. self.lazy_load = lazy_load
  17. self._data_cache = {}
  18. self.opt = parser.parse_args_and_yaml()
  19. self.target_dir = target_dir
  20. def wrapper_path(self, station_id, spec):
  21. return f"{Path(self.target_dir)/station_id/spec}.txt"
  22. def wrapper_path_cdq(self, area_id, spec):
  23. return f"{self.target_dir/area_id/spec}.txt"
  24. def _load_material(self, station_id):
  25. """核心数据加载方法"""
  26. # 根据您的原始代码逻辑简化的加载流程
  27. try:
  28. basic = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['basic']), sep=r'\s+', header=0)
  29. power = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['power']), sep=r'\s+', header=0)
  30. plant_type = int(basic.loc[basic['PropertyID'].tolist().index('PlantType'), 'Value'])
  31. assert plant_type == 0 or plant_type == 1
  32. # 根据电站类型加载数据
  33. nwp_v = pd.read_csv(self.wrapper_path(station_id, f"0/{self.opt.doc_mapping['nwp_v']}"), sep=r'\s+', header=0)
  34. nwp_v_h = pd.read_csv(self.wrapper_path(station_id, f"0/{self.opt.doc_mapping['nwp_v_h']}"), sep=r'\s+', header=0)
  35. nwp_own = pd.read_csv(self.wrapper_path(station_id, f"1/{self.opt.doc_mapping['nwp_own']}"), sep=r'\s+', header=0)
  36. nwp_own_h = pd.read_csv(self.wrapper_path(station_id, f"1/{self.opt.doc_mapping['nwp_own_h']}"), sep=r'\s+', header=0)
  37. if self.opt.switch_nwp_owner:
  38. nwp_v, nwp_v_h = nwp_own, nwp_own_h
  39. # 如果是风电
  40. if plant_type == 0:
  41. station_info = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_w']), sep=r'\s+', header=0)
  42. station_info_d = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_d_w']), sep=r'\s+', header=0)
  43. nwp = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_w']), sep=r'\s+', header=0)
  44. nwp_h = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_w_h']), sep=r'\s+', header=0)
  45. cap = float(station_info.loc[0, 'PlantCap'])
  46. if Path(self.wrapper_path(station_id, self.opt.doc_mapping['env_wf'])).exists():
  47. env = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['env_wf']), sep=r'\s+', header=0)
  48. else:
  49. env = None
  50. # 如果是光伏
  51. else:
  52. station_info = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_s']), sep=r'\s+', header=0)
  53. station_info_d = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_d_s']), sep=r'\s+', header=0)
  54. nwp = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_s']), sep=r'\s+', header=0)
  55. nwp_h = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_s_h']), sep=r'\s+', header=0)
  56. cap = float(station_info.loc[0, 'PlantCap'])
  57. if Path(self.wrapper_path(station_id, self.opt.doc_mapping['env_sf'])).exists():
  58. env = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['env_sf']), sep=r'\s+', header=0)
  59. else:
  60. env = None
  61. return types.SimpleNamespace(**{
  62. 'station_info': station_info,
  63. 'station_info_d': station_info_d,
  64. 'nwp': nwp,
  65. 'nwp_h': nwp_h,
  66. 'power': power,
  67. 'nwp_v': nwp_v,
  68. 'nwp_v_h': nwp_v_h,
  69. 'env': env,
  70. 'cap': cap
  71. })
  72. except Exception as e:
  73. print(f"Error loading {station_id}: {str(e)}")
  74. return None
  75. def _load_material_cdq(self, area_id, moment):
  76. """核心数据加载方法"""
  77. # 根据您的原始代码逻辑简化的加载流程
  78. try:
  79. basic = pd.read_csv(self.wrapper_path_cdq(area_id, self.opt.doc_cdq_mapping['basic']), sep=r'\s+', header=0)
  80. basic_area = pd.read_csv(self.wrapper_path_cdq(area_id, self.opt.doc_cdq_mapping['basic_area']), sep=r'\s+', header=0)
  81. plant_type = int(basic.loc[basic['PropertyID'].tolist().index('PlantType'), 'Value'])
  82. assert plant_type == 0 or plant_type == 1
  83. # 根据电站类型加载数据
  84. if self.opt.switch_nwp_owner:
  85. nwp_v, nwp_v_h = nwp_own, nwp_own_h
  86. # 如果是风电
  87. if plant_type == 0:
  88. station_info = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_w']), sep=r'\s+', header=0)
  89. station_info_d = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_d_w']), sep=r'\s+', header=0)
  90. nwp = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_w']), sep=r'\s+', header=0)
  91. nwp_h = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_w_h']), sep=r'\s+', header=0)
  92. cap = float(station_info.loc[0, 'PlantCap'])
  93. if Path(self.wrapper_path(station_id, self.opt.doc_mapping['env_wf'])).exists():
  94. env = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['env_wf']), sep=r'\s+', header=0)
  95. else:
  96. env = None
  97. # 如果是光伏
  98. else:
  99. station_info = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_s']), sep=r'\s+', header=0)
  100. station_info_d = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_d_s']), sep=r'\s+', header=0)
  101. nwp = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_s']), sep=r'\s+', header=0)
  102. nwp_h = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_s_h']), sep=r'\s+', header=0)
  103. cap = float(station_info.loc[0, 'PlantCap'])
  104. if Path(self.wrapper_path(station_id, self.opt.doc_mapping['env_sf'])).exists():
  105. env = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['env_sf']), sep=r'\s+', header=0)
  106. else:
  107. env = None
  108. return types.SimpleNamespace(**{
  109. 'station_info': station_info,
  110. 'station_info_d': station_info_d,
  111. 'nwp': nwp,
  112. 'nwp_h': nwp_h,
  113. 'power': power,
  114. 'nwp_v': nwp_v,
  115. 'nwp_v_h': nwp_v_h,
  116. 'env': env,
  117. 'cap': cap
  118. })
  119. except Exception as e:
  120. print(f"Error loading {station_id}: {str(e)}")
  121. return None
  122. def get_material(self, station_id):
  123. if self.lazy_load:
  124. if station_id not in self._data_cache:
  125. self._data_cache[station_id] = self._load_material(station_id)
  126. return self._data_cache[station_id]
  127. else:
  128. return self._load_material(station_id)
  129. def add_weights(self, data_objects):
  130. """对nwp数据进行cap加权(nwp, nwp_h, nwp_v_, nwp_v_h)"""
  131. def local_sum(df, weight):
  132. """内部函数:对DataFrame进行加权求和"""
  133. weighted_df = df.copy()
  134. columns_to_scale = [col for col in df.columns if col not in ['PlantID', 'PlantName', 'PlantType', 'Qbsj', 'Datetime']]
  135. weighted_df[columns_to_scale] = weighted_df[columns_to_scale] * weight
  136. return weighted_df, weight
  137. # 从data_objects解构对象
  138. nwp, nwp_h, nwp_v, nwp_v_h, power, cap = (
  139. data_objects.nwp,
  140. data_objects.nwp_h,
  141. data_objects.nwp_v,
  142. data_objects.nwp_v_h,
  143. data_objects.power,
  144. data_objects.cap
  145. )
  146. # 对每个NWP数据集进行容量加权
  147. weighted_nwp, cap = local_sum(nwp, cap)
  148. weighted_nwp_h, _ = local_sum(nwp_h, cap)
  149. weighted_nwp_v, _ = local_sum(nwp_v, cap)
  150. weighted_nwp_v_h, _ = local_sum(nwp_v_h, cap)
  151. return {
  152. 'nwp': weighted_nwp,
  153. 'nwp_h': weighted_nwp_h,
  154. 'nwp_v': weighted_nwp_v,
  155. 'nwp_v_h': weighted_nwp_v_h,
  156. 'cap': cap
  157. }
  158. def get_material_region(self):
  159. try:
  160. basic = pd.read_csv(os.path.join(str(self.target_dir), self.opt.doc_mapping['basic_area']+'.txt'), sep=r'\s+', header=0)
  161. power = pd.read_csv(os.path.join(str(self.target_dir), self.opt.doc_mapping['power_area']+'.txt'), sep=r'\s+', header=0)
  162. plant_type = int(basic.loc[basic['PropertyID'].tolist().index('PlantType'), 'Value'])
  163. area_id = int(basic.loc[basic['PropertyID'].tolist().index('AreaId'), 'Value'])
  164. assert plant_type == 0 or plant_type == 1
  165. area_cap = float(basic.loc[basic['PropertyID'].tolist().index('AreaCap'), 'Value'])
  166. return types.SimpleNamespace(**{
  167. 'power': power,
  168. 'area_cap': area_cap,
  169. 'area_id': area_id
  170. })
  171. except Exception as e:
  172. print(f"Region Error loading: {str(e)}")
  173. return None
  174. def get_material_cdq(self):
  175. pass
  176. if __name__ == "__main__":
  177. run_code = 0