material.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # @FileName :material.py
  4. # @Time :2025/4/29 11:07
  5. # @Author :David
  6. # @Company: shenyang JY
  7. import os.path
  8. import types
  9. import pandas as pd
  10. from pathlib import Path
  11. from datetime import datetime, timedelta
  12. from app.common.config import logger, parser
  13. from concurrent.futures import ThreadPoolExecutor
  14. from functools import partial
  15. class MaterialLoader:
  16. def __init__(self, opt, lazy_load=True):
  17. self.lazy_load = lazy_load
  18. self._data_cache = {}
  19. self.opt = opt
  20. self.target_dir = os.path.join(opt.dqyc_base_path, opt.input_file)
  21. self.target_dir_cdq = os.path.join(opt.cdqyc_base_path, opt.moment)
  22. def wrapper_path(self, station_id, spec):
  23. return f"{self.target_dir/station_id/spec}.txt"
  24. def wrapper_path_cdq(self, area_id, spec):
  25. return f"{self.target_dir/area_id/spec}.txt"
  26. def _load_cdq_history_rp(self):
  27. """
  28. 加载超短期历史实发功率
  29. """
  30. dt = datetime.strptime(self.opt.moment, '%Y%m%d_%H%M')
  31. dts = [x.strftime("%Y%m%d_%H%M") for x in [dt, dt-timedelta(minutes=15), dt-timedelta(minutes=30)]]
  32. def wrapper_path_cdq_his(points):
  33. segs = self.opt.cdqyc_base_path.split('/')
  34. cdq_his = []
  35. for point in points:
  36. his_file = f'{self.opt.doc_cdq_mapping['history_power']}_{segs[0]}_{segs[1]}_{point}.txt'
  37. cdq_his.append(os.path.join(self.opt.cdqyc_base_path, his_file))
  38. return cdq_his
  39. cdq_his_rp = []
  40. for dt_text, his_file in zip(dts, wrapper_path_cdq_his(dts)):
  41. cdq_his_df = pd.read_csv(his_file)
  42. cdq_his_df['moment'] = dt_text
  43. cdq_his_rp.append(cdq_his_df)
  44. cdq_his_rp = pd.concat(cdq_his_rp, axis=0)
  45. return cdq_his_rp
  46. def _load_dq_res(self, area_id):
  47. """
  48. 加载短期预测结果
  49. """
  50. dt_str = datetime.strptime(self.opt.moment, '%Y%m%d_%H%M').strftime("%Y-%m-%d")
  51. dq_path = os.path.join(self.opt.dqyc_base_path, area_id, dt_str, 'IN')
  52. all_stations = [str(child.parts[-1]) for child in Path(str(dq_path)).iterdir() if child.is_dir()]
  53. dqs = [pd.read_csv(os.path.join(str(dq_path), station, self.opt.doc_mapping['out'])) for station in all_stations]
  54. dqs.append(pd.read_csv(os.path.join(str(dq_path), self.opt.doc_mapping['area_out'])))
  55. dq = pd.concat(dqs)
  56. return dq
  57. def _load_material(self, station_id):
  58. """核心数据加载方法"""
  59. # 根据您的原始代码逻辑简化的加载流程
  60. try:
  61. basic = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['basic']), sep=r'\s+', header=0)
  62. power = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['power']), sep=r'\s+', header=0)
  63. plant_type = int(basic.loc[basic['PropertyID'].tolist().index('PlantType'), 'Value'])
  64. assert plant_type == 0 or plant_type == 1
  65. # 根据电站类型加载数据
  66. nwp_v = pd.read_csv(self.wrapper_path(station_id, f"0/{self.opt.doc_mapping['nwp_v']}"), sep=r'\s+', header=0)
  67. nwp_v_h = pd.read_csv(self.wrapper_path(station_id, f"0/{self.opt.doc_mapping['nwp_v_h']}"), sep=r'\s+', header=0)
  68. nwp_own = pd.read_csv(self.wrapper_path(station_id, f"1/{self.opt.doc_mapping['nwp_own']}"), sep=r'\s+', header=0)
  69. nwp_own_h = pd.read_csv(self.wrapper_path(station_id, f"1/{self.opt.doc_mapping['nwp_own_h']}"), sep=r'\s+', header=0)
  70. if self.opt.switch_nwp_owner:
  71. nwp_v, nwp_v_h = nwp_own, nwp_own_h
  72. # 如果是风电
  73. if plant_type == 0:
  74. station_info = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_w']), sep=r'\s+', header=0)
  75. station_info_d = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_d_w']), sep=r'\s+', header=0)
  76. nwp = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_w']), sep=r'\s+', header=0)
  77. nwp_h = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_w_h']), sep=r'\s+', header=0)
  78. cap = float(station_info.loc[0, 'PlantCap'])
  79. if Path(self.wrapper_path(station_id, self.opt.doc_mapping['env_wf'])).exists():
  80. env = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['env_wf']), sep=r'\s+', header=0)
  81. else:
  82. env = None
  83. # 如果是光伏
  84. else:
  85. station_info = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_s']), sep=r'\s+', header=0)
  86. station_info_d = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_d_s']), sep=r'\s+', header=0)
  87. nwp = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_s']), sep=r'\s+', header=0)
  88. nwp_h = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_s_h']), sep=r'\s+', header=0)
  89. cap = float(station_info.loc[0, 'PlantCap'])
  90. if Path(self.wrapper_path(station_id, self.opt.doc_mapping['env_sf'])).exists():
  91. env = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['env_sf']), sep=r'\s+', header=0)
  92. else:
  93. env = None
  94. return types.SimpleNamespace(**{
  95. 'station_info': station_info,
  96. 'station_info_d': station_info_d,
  97. 'nwp': nwp,
  98. 'nwp_h': nwp_h,
  99. 'power': power,
  100. 'nwp_v': nwp_v,
  101. 'nwp_v_h': nwp_v_h,
  102. 'env': env,
  103. 'cap': cap
  104. })
  105. except Exception as e:
  106. print(f"Error loading {station_id}: {str(e)}")
  107. return None
  108. def _load_material_cdq(self, area_id):
  109. """超短期核心数据加载方法"""
  110. # 根据您的原始代码逻辑简化的加载流程
  111. try:
  112. basic = pd.read_csv(self.wrapper_path_cdq(area_id, self.opt.doc_cdq_mapping['basic']), sep=r'\s+', header=0)
  113. basic_area = pd.read_csv(self.wrapper_path_cdq(area_id, self.opt.doc_cdq_mapping['basic_area']), sep=r'\s+', header=0)
  114. plant_type = int(basic.loc[basic['PropertyID'].tolist().index('PlantType'), 'Value'])
  115. assert plant_type == 0 or plant_type == 1
  116. # 根据电站类型加载数据
  117. # 如果是风电
  118. if plant_type == 0:
  119. station_info = pd.read_csv(self.wrapper_path_cdq(area_id, self.opt.doc_mapping['station_info_w']), sep=r'\s+', header=0)
  120. station_info_d = pd.read_csv(self.wrapper_path_cdq(area_id, self.opt.doc_mapping['station_info_d_w']), sep=r'\s+', header=0)
  121. cap = float(station_info.loc[0, 'PlantCap'])
  122. # 去短期预测结果中加载当日短期
  123. dq = self._load_dq_res(area_id)
  124. cdq_his_rp = self._load_cdq_history_rp()
  125. # 如果是光伏
  126. else:
  127. station_info = pd.read_csv(self.wrapper_path_cdq(area_id, self.opt.doc_mapping['station_info_s']), sep=r'\s+', header=0)
  128. station_info_d = pd.read_csv(self.wrapper_path_cdq(area_id, self.opt.doc_mapping['station_info_d_s']), sep=r'\s+', header=0)
  129. cap = float(station_info.loc[0, 'PlantCap'])
  130. # 去短期预测结果中加载当日短期
  131. dq = self._load_dq_res(area_id)
  132. cdq_his_rp = self._load_cdq_history_rp()
  133. return types.SimpleNamespace(**{
  134. 'station_info': station_info,
  135. 'station_info_d': station_info_d,
  136. 'dq': dq,
  137. 'cdq_his_rp': cdq_his_rp,
  138. 'cap': cap
  139. })
  140. except Exception as e:
  141. print(f"CDQ Error loading {area_id}: {str(e)}")
  142. return None
  143. def get_material(self, station_id):
  144. if self.lazy_load:
  145. if station_id not in self._data_cache:
  146. self._data_cache[station_id] = self._load_material(station_id)
  147. return self._data_cache[station_id]
  148. else:
  149. return self._load_material(station_id)
  150. def add_weights(self, data_objects):
  151. """对nwp数据进行cap加权(nwp, nwp_h, nwp_v_, nwp_v_h)"""
  152. def local_sum(df, weight):
  153. """内部函数:对DataFrame进行加权求和"""
  154. weighted_df = df.copy()
  155. columns_to_scale = [col for col in df.columns if col not in ['PlantID', 'PlantName', 'PlantType', 'Qbsj', 'Datetime']]
  156. weighted_df[columns_to_scale] = weighted_df[columns_to_scale] * weight
  157. return weighted_df, weight
  158. # 从data_objects解构对象
  159. nwp, nwp_h, nwp_v, nwp_v_h, power, cap = (
  160. data_objects.nwp,
  161. data_objects.nwp_h,
  162. data_objects.nwp_v,
  163. data_objects.nwp_v_h,
  164. data_objects.power,
  165. data_objects.cap
  166. )
  167. # 对每个NWP数据集进行容量加权
  168. weighted_nwp, cap = local_sum(nwp, cap)
  169. weighted_nwp_h, _ = local_sum(nwp_h, cap)
  170. weighted_nwp_v, _ = local_sum(nwp_v, cap)
  171. weighted_nwp_v_h, _ = local_sum(nwp_v_h, cap)
  172. return {
  173. 'nwp': weighted_nwp,
  174. 'nwp_h': weighted_nwp_h,
  175. 'nwp_v': weighted_nwp_v,
  176. 'nwp_v_h': weighted_nwp_v_h,
  177. 'cap': cap
  178. }
  179. def get_material_region(self):
  180. try:
  181. basic = pd.read_csv(os.path.join(str(self.target_dir), self.opt.doc_mapping['basic_area']+'.txt'), sep=r'\s+', header=0)
  182. power = pd.read_csv(os.path.join(str(self.target_dir), self.opt.doc_mapping['power_area']+'.txt'), sep=r'\s+', header=0)
  183. plant_type = int(basic.loc[basic['PropertyID'].tolist().index('PlantType'), 'Value'])
  184. area_id = int(basic.loc[basic['PropertyID'].tolist().index('AreaId'), 'Value'])
  185. assert plant_type == 0 or plant_type == 1
  186. area_cap = float(basic.loc[basic['PropertyID'].tolist().index('AreaCap'), 'Value'])
  187. return types.SimpleNamespace(**{
  188. 'power': power,
  189. 'area_cap': area_cap,
  190. 'area_id': area_id
  191. })
  192. except Exception as e:
  193. print(f"Region Error loading: {str(e)}")
  194. return None
  195. def get_material_cdq(self):
  196. pass
  197. if __name__ == "__main__":
  198. run_code = 0