material.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # @FileName :material.py
  4. # @Time :2025/4/29 11:07
  5. # @Author :David
  6. # @Company: shenyang JY
  7. import os.path
  8. import types
  9. import pandas as pd
  10. from pathlib import Path
  11. from datetime import datetime, timedelta
  12. from app.common.config import logger, parser
  13. from concurrent.futures import ThreadPoolExecutor
  14. from functools import partial
  15. class MaterialLoader:
  16. def __init__(self, opt, lazy_load=True):
  17. self.lazy_load = lazy_load
  18. self._data_cache = {}
  19. self.opt = opt
  20. self.target_dir = os.path.join(opt.dqyc_base_path, opt.input_file)
  21. self.target_dir_cdq = os.path.join(opt.cdqyc_base_path, opt.moment)
  22. def wrapper_path(self, spec):
  23. return os.path.join(str(self.target_dir), f"{spec}.txt")
  24. def wrapper_path_cdq(self, spec):
  25. return os.path.join(str(self.target_dir_cdq), 'IN', f"{spec}.txt")
  26. def _load_cdq_history_rp(self, area_id, plant_type):
  27. """
  28. 加载超短期历史实发功率:加载当前时刻和前2个时刻的历史实际功率
  29. """
  30. dt = datetime.strptime(self.opt.moment, '%Y%m%d_%H%M')
  31. dts = [x.strftime("%Y%m%d_%H%M") for x in [dt, dt-timedelta(minutes=15), dt-timedelta(minutes=30)]]
  32. def wrapper_path_cdq_his(points):
  33. cdq_his = []
  34. for point in points:
  35. his_file = f'{self.opt.doc_cdq_mapping['history_power']}_{area_id}_{plant_type}_{point}.txt'
  36. cdq_his.append(os.path.join(self.opt.cdqyc_base_path, his_file))
  37. return cdq_his
  38. cdq_his_rp = []
  39. for dt_text, his_file in zip(dts, wrapper_path_cdq_his(dts)):
  40. dt = datetime.strptime(dt_text, '%Y%m%d_%H%M').strftime("%Y-%m-%d %H:%M:%S")
  41. cdq_his_df = pd.read_csv(his_file, sep=r'\s+', header=0)
  42. cdq_his_df['moment'] = dt
  43. cdq_his_rp.append(cdq_his_df)
  44. cdq_his_rp = pd.concat(cdq_his_rp, axis=0)
  45. return cdq_his_rp
  46. def _load_dq_res(self, area_id, plant_type, begin_time):
  47. """
  48. 加载短期预测结果:场站级短期和区域级短期
  49. """
  50. task_id = str(self.opt.dq_area_task_mapping[int(area_id)][int(plant_type)])
  51. # 找时间:超短期时间目录时间(当前时间),超短期预测起始时间,减去一天,是短期时间目录的时间(当前时间)
  52. dt_str = (datetime.strptime(begin_time, '%Y-%m-%d %H:%M:%S') - timedelta(days=1)).strftime("%Y-%m-%d")
  53. dq_path = os.path.join(str(self.opt.dqyc_base_path), task_id, area_id, dt_str, 'IN')
  54. dq_path_out = os.path.join(str(self.opt.dqyc_base_path), task_id, area_id, dt_str, 'OUT')
  55. all_stations = [str(child.parts[-1]) for child in Path(str(dq_path)).iterdir() if child.is_dir()]
  56. dqs = [pd.read_csv(os.path.join(str(dq_path), station, self.opt.doc_mapping['out']+'.txt'), sep=r'\s+', header=0) for station in all_stations]
  57. dq = pd.concat(dqs)
  58. dq_area = pd.read_csv(os.path.join(str(dq_path_out), self.opt.doc_mapping['area_out']+'.txt'), sep=r'\s+', header=0)
  59. return dq, dq_area
  60. def _load_material(self, station_id):
  61. """核心数据加载方法"""
  62. # 根据您的原始代码逻辑简化的加载流程
  63. try:
  64. basic = pd.read_csv(self.wrapper_path(self.opt.doc_mapping['basic']), sep=r'\s+', header=0)
  65. power = pd.read_csv(self.wrapper_path(self.opt.doc_mapping['power']), sep=r'\s+', header=0)
  66. plant_type = int(basic.loc[basic['PropertyID'].tolist().index('PlantType'), 'Value'])
  67. assert plant_type == 0 or plant_type == 1
  68. # 根据电站类型加载数据
  69. nwp_v = pd.read_csv(self.wrapper_path(f"0/{self.opt.doc_mapping['nwp_v']}"), sep=r'\s+', header=0)
  70. nwp_v_h = pd.read_csv(self.wrapper_path(f"0/{self.opt.doc_mapping['nwp_v_h']}"), sep=r'\s+', header=0)
  71. nwp_own = pd.read_csv(self.wrapper_path(f"1/{self.opt.doc_mapping['nwp_own']}"), sep=r'\s+', header=0)
  72. nwp_own_h = pd.read_csv(self.wrapper_path(f"1/{self.opt.doc_mapping['nwp_own_h']}"), sep=r'\s+', header=0)
  73. if self.opt.switch_nwp_owner:
  74. nwp_v, nwp_v_h = nwp_own, nwp_own_h
  75. # 如果是风电
  76. if plant_type == 0:
  77. station_info = pd.read_csv(self.wrapper_path(self.opt.doc_mapping['station_info_w']), sep=r'\s+', header=0)
  78. station_info_d = pd.read_csv(self.wrapper_path(self.opt.doc_mapping['station_info_d_w']), sep=r'\s+', header=0)
  79. nwp = pd.read_csv(self.wrapper_path(self.opt.doc_mapping['nwp_w']), sep=r'\s+', header=0)
  80. nwp_h = pd.read_csv(self.wrapper_path(self.opt.doc_mapping['nwp_w_h']), sep=r'\s+', header=0)
  81. cap = float(station_info.loc[0, 'PlantCap'])
  82. if Path(self.wrapper_path(self.opt.doc_mapping['env_wf'])).exists():
  83. env = pd.read_csv(self.wrapper_path(self.opt.doc_mapping['env_wf']), sep=r'\s+', header=0)
  84. else:
  85. env = None
  86. # 如果是光伏
  87. else:
  88. station_info = pd.read_csv(self.wrapper_path(self.opt.doc_mapping['station_info_s']), sep=r'\s+', header=0)
  89. station_info_d = pd.read_csv(self.wrapper_path(self.opt.doc_mapping['station_info_d_s']), sep=r'\s+', header=0)
  90. nwp = pd.read_csv(self.wrapper_path(self.opt.doc_mapping['nwp_s']), sep=r'\s+', header=0)
  91. nwp_h = pd.read_csv(self.wrapper_path(self.opt.doc_mapping['nwp_s_h']), sep=r'\s+', header=0)
  92. cap = float(station_info.loc[0, 'PlantCap'])
  93. if Path(self.wrapper_path(self.opt.doc_mapping['env_sf'])).exists():
  94. env = pd.read_csv(self.wrapper_path(self.opt.doc_mapping['env_sf']), sep=r'\s+', header=0)
  95. else:
  96. env = None
  97. return types.SimpleNamespace(**{
  98. 'station_info': station_info,
  99. 'station_info_d': station_info_d,
  100. 'nwp': nwp,
  101. 'nwp_h': nwp_h,
  102. 'power': power,
  103. 'nwp_v': nwp_v,
  104. 'nwp_v_h': nwp_v_h,
  105. 'env': env,
  106. 'cap': cap
  107. })
  108. except Exception as e:
  109. print(f"Error loading {station_id}: {str(e)}")
  110. return None
  111. def get_material_cdq(self):
  112. """超短期核心数据加载方法"""
  113. # 根据您的原始代码逻辑简化的加载流程
  114. try:
  115. area_id, plant_type = self.opt.input_file.split('/')[0], self.opt.input_file.split('/')[1]
  116. self.target_dir_cdq = os.path.join(str(self.target_dir_cdq), area_id, plant_type)
  117. basic = pd.read_csv(self.wrapper_path_cdq(self.opt.doc_cdq_mapping['basic']), sep=r'\s+', header=0)
  118. basic_area = pd.read_csv(self.wrapper_path_cdq(self.opt.doc_cdq_mapping['basic_area']), sep=r'\s+', header=0)
  119. plant_type1 = int(basic.loc[basic['PropertyID'].tolist().index('PlantType'), 'Value'])
  120. begin_time = str(basic.loc[basic['PropertyID'].tolist().index('ForecastBeginTime'), 'Value'])
  121. end_time = str(basic.loc[basic['PropertyID'].tolist().index('ForecastEndTime'), 'Value'])
  122. # assert plant_type == 0 or plant_type == 1
  123. dq = 1
  124. # 根据电站类型加载数据
  125. # 如果是风电
  126. if int(plant_type) == 0:
  127. station_info = pd.read_csv(self.wrapper_path_cdq(self.opt.doc_cdq_mapping['station_info_w']), sep=r'\s+', header=0)
  128. station_info_d = pd.read_csv(self.wrapper_path_cdq(self.opt.doc_cdq_mapping['station_info_d_w']), sep=r'\s+', header=0)
  129. cap = float(station_info.loc[0, 'PlantCap'])
  130. # 去短期预测结果中加载当日短期
  131. dq, dq_area = self._load_dq_res(area_id, plant_type, begin_time)
  132. cdq_his_rp = self._load_cdq_history_rp(area_id, plant_type)
  133. # 如果是光伏
  134. else:
  135. station_info = pd.read_csv(self.wrapper_path_cdq(self.opt.doc_cdq_mapping['station_info_s']), sep=r'\s+', header=0)
  136. station_info_d = pd.read_csv(self.wrapper_path_cdq(self.opt.doc_cdq_mapping['station_info_d_s']), sep=r'\s+', header=0)
  137. cap = float(station_info.loc[0, 'PlantCap'])
  138. # 去短期预测结果中加载当日短期
  139. dq, dq_area = self._load_dq_res(area_id, plant_type, begin_time)
  140. cdq_his_rp = self._load_cdq_history_rp(area_id, plant_type)
  141. return types.SimpleNamespace(**{
  142. 'station_info': station_info,
  143. 'station_info_d': station_info_d,
  144. 'dq': dq,
  145. 'dq_area': dq_area,
  146. 'cdq_his_rp': cdq_his_rp,
  147. 'cap': cap,
  148. 'begin_time': begin_time,
  149. 'end_time': end_time
  150. })
  151. except Exception as e:
  152. print(f"CDQ Error loading: {str(e)}")
  153. return None
  154. def get_material(self, station_id):
  155. if self.lazy_load:
  156. if station_id not in self._data_cache:
  157. self.target_dir = os.path.join(str(self.target_dir), station_id)
  158. self._data_cache[station_id] = self._load_material(station_id)
  159. return self._data_cache[station_id]
  160. else:
  161. return self._load_material(station_id)
  162. def add_weights(self, data_objects):
  163. """对nwp数据进行cap加权(nwp, nwp_h, nwp_v_, nwp_v_h)"""
  164. def local_sum(df, weight):
  165. """内部函数:对DataFrame进行加权求和"""
  166. weighted_df = df.copy()
  167. columns_to_scale = [col for col in df.columns if col not in ['PlantID', 'PlantName', 'PlantType', 'Qbsj', 'Datetime']]
  168. weighted_df[columns_to_scale] = weighted_df[columns_to_scale] * weight
  169. return weighted_df, weight
  170. # 从data_objects解构对象
  171. nwp, nwp_h, nwp_v, nwp_v_h, power, cap = (
  172. data_objects.nwp,
  173. data_objects.nwp_h,
  174. data_objects.nwp_v,
  175. data_objects.nwp_v_h,
  176. data_objects.power,
  177. data_objects.cap
  178. )
  179. # 对每个NWP数据集进行容量加权
  180. weighted_nwp, cap = local_sum(nwp, cap)
  181. weighted_nwp_h, _ = local_sum(nwp_h, cap)
  182. weighted_nwp_v, _ = local_sum(nwp_v, cap)
  183. weighted_nwp_v_h, _ = local_sum(nwp_v_h, cap)
  184. return {
  185. 'nwp': weighted_nwp,
  186. 'nwp_h': weighted_nwp_h,
  187. 'nwp_v': weighted_nwp_v,
  188. 'nwp_v_h': weighted_nwp_v_h,
  189. 'cap': cap
  190. }
  191. def get_material_region(self):
  192. try:
  193. basic = pd.read_csv(os.path.join(str(self.target_dir), self.opt.doc_mapping['basic_area']+'.txt'), sep=r'\s+', header=0)
  194. power = pd.read_csv(os.path.join(str(self.target_dir), self.opt.doc_mapping['power_area']+'.txt'), sep=r'\s+', header=0)
  195. plant_type = int(basic.loc[basic['PropertyID'].tolist().index('PlantType'), 'Value'])
  196. area_id = int(basic.loc[basic['PropertyID'].tolist().index('AreaId'), 'Value'])
  197. assert plant_type == 0 or plant_type == 1
  198. area_cap = float(basic.loc[basic['PropertyID'].tolist().index('AreaCap'), 'Value'])
  199. return types.SimpleNamespace(**{
  200. 'power': power,
  201. 'area_cap': area_cap,
  202. 'area_id': area_id
  203. })
  204. except Exception as e:
  205. print(f"Region Error loading: {str(e)}")
  206. return None
  207. if __name__ == "__main__":
  208. run_code = 0