task_worker.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # @FileName :task_worker.py
  4. # @Time :2025/4/29 11:05
  5. # @Author :David
  6. # @Company: shenyang JY
  7. import logging, os
  8. import pandas as pd
  9. from scipy.cluster.hierarchy import weighted
  10. from app.model.tf_model_train import ModelTrainer
  11. from app.predict.tf_model_pre import ModelPre
  12. from app.common.data_cleaning import key_field_row_cleaning
  13. from app.common.config import logger
  14. class TaskTrain(object):
  15. def __init__(self, loader):
  16. self.loader = loader
  17. def station_task(self, config):
  18. """场站级训练任务"""
  19. station_id = -99
  20. try:
  21. print("111")
  22. station_id = config['station_id']
  23. # 动态生成场站数据路径
  24. print("222")
  25. # 加载数据
  26. data_objects = self.loader.get_material(station_id)
  27. local_weights = self.loader.add_weights(data_objects)
  28. print("333")
  29. # 数据合并
  30. train_data = pd.merge(data_objects.nwp_v_h, data_objects.power, on=config['col_time'])
  31. print("444")
  32. # 模型训练
  33. # model = ModelTrainer(station_id, train_data, capacity=data_objects.cap, gpu_id=config.get('gpu_assignment'))
  34. model = ModelTrainer(train_data, capacity=data_objects.cap, config=config)
  35. model.train()
  36. print("555")
  37. return {'status': 'success', 'station_id': station_id, 'weights': local_weights}
  38. except Exception as e:
  39. logging.error(f"Station {station_id} failed: {str(e)}")
  40. return {'status': 'failed', 'station_id': station_id}
  41. def region_task(self, config, data_nwps):
  42. """区域级训练任务"""
  43. area_id = -99
  44. try:
  45. print("111")
  46. # 动态生成场站数据路径
  47. print("222")
  48. # 加载数据
  49. data_objects = self.loader.get_material_region()
  50. config['area_id'] = data_objects.area_id
  51. area_id = data_objects.area_id
  52. print("333")
  53. # 数据合并
  54. print(data_nwps.nwp)
  55. print(data_nwps.nwp_v)
  56. print("累加的区域装机量{},实际区域装机量{}".format(data_nwps.total_cap, data_objects.area_cap))
  57. train_data = pd.merge(data_nwps.nwp_v_h, data_objects.power, on=config['col_time'])
  58. print("444")
  59. # 模型训练
  60. model = ModelTrainer(train_data, capacity=data_objects.area_cap, config=config)
  61. model.train(pre_area=True)
  62. print("555")
  63. return {'status': 'success', 'area_id': area_id}
  64. except Exception as e:
  65. logging.error(f"Area {area_id} failed: {str(e)}")
  66. return {'status': 'failed', 'area_id': area_id}
  67. class TaskPre(object):
  68. def __init__(self, loader):
  69. self.loader = loader
  70. def station_task(self, config):
  71. """场站级训练任务"""
  72. station_id = -99
  73. try:
  74. print("111")
  75. station_id = config['station_id']
  76. # 动态生成场站数据路径
  77. print("222")
  78. # 加载数据
  79. data_objects = self.loader.get_material(station_id)
  80. local_weights = self.loader.add_weights(data_objects)
  81. print("333")
  82. # 数据合并
  83. pre_data = data_objects.nwp_v
  84. print("444")
  85. # 模型训练
  86. # model = ModelTrainer(station_id, train_data, capacity=data_objects.cap, gpu_id=config.get('gpu_assignment'))
  87. model = ModelPre(pre_data, capacity=data_objects.cap, config=config)
  88. model.predict()
  89. print("555")
  90. return {'status': 'success', 'station_id': station_id, 'weights': local_weights}
  91. except Exception as e:
  92. logging.error(f"Station {station_id} failed: {str(e)}")
  93. return {'status': 'failed', 'station_id': station_id}
  94. def region_task(self, config, data_nwps):
  95. """区域级训练任务"""
  96. area_id = -99
  97. try:
  98. print("111")
  99. # 动态生成场站数据路径
  100. print("222")
  101. # 加载数据
  102. data_objects = self.loader.get_material_region()
  103. config['area_id'] = data_objects.area_id
  104. area_id = data_objects.area_id
  105. print("333")
  106. # 数据合并
  107. print(data_nwps.nwp)
  108. print(data_nwps.nwp_v)
  109. print("累加的区域装机量{},实际区域装机量{}".format(data_nwps.total_cap, data_objects.area_cap))
  110. pre_data = data_nwps.nwp_v
  111. print("444")
  112. # 模型训练
  113. model = ModelPre(pre_data, capacity=data_objects.area_cap, config=config)
  114. model.predict(pre_area=True)
  115. print("555")
  116. return {'status': 'success', 'area_id': area_id}
  117. except Exception as e:
  118. logging.error(f"Area {area_id} failed: {str(e)}")
  119. return {'status': 'failed', 'area_id': area_id}
  120. class CDQTaskPre(object):
  121. def __init__(self, loader, opt):
  122. self.loader = loader
  123. self.opt = opt
  124. def calculate_dq_fix_weighted(self, dq, dq_area, his_rp, begin_time, end_time):
  125. """
  126. 1. 依次从场站编号、最后是区域编号进行遍历
  127. 2. 将遍历的历史功率和短期时间对齐,算出过去3个时刻的平均
  128. 3. 根据起止时间获取短期的16个点,根据偏差进行短期加权
  129. """
  130. def weighted_each_point(his_rp_term, dq_16, error):
  131. weighted = list(his_rp_term.head(1)[['Grade', 'Type', 'ID', 'Value']].values[0])
  132. dq_16['dq_fix'] = dq_16['Power'] + error
  133. for point, row in dq_16.iterrows():
  134. T = 'T'+str(point + 1)
  135. cdq_value = row['dq_fix']*self.opt.coe[T]['n'] + row['Power']*self.opt.coe[T]['m']
  136. weighted.append(cdq_value)
  137. return weighted
  138. def dq_fix_weighted(his_rp_term, dq_term):
  139. his_rp_dq = pd.merge(his_rp_term, dq_term, left_on='moment', right_on='Datetime')
  140. his_rp_dq_clean = key_field_row_cleaning(his_rp_dq, ['Value'], logger)
  141. if not his_rp_dq_clean.empty:
  142. his_rp_dq['error'] = his_rp_dq['Value'] - his_rp_dq['Power']
  143. error = his_rp_dq['error'].mean()
  144. else:
  145. error = 0
  146. dq_term = dq_term.set_index('Datetime')
  147. dq_16 = dq_term.loc[begin_time: end_time].reset_index(drop=False)
  148. cdq_16 = weighted_each_point(his_rp_term, dq_16, error)
  149. return cdq_16
  150. his_rp['moment'] = pd.to_datetime(his_rp['moment'])
  151. dq['Datetime'] = pd.to_datetime(dq['Datetime'])
  152. dq_area['Datetime'] = pd.to_datetime(dq_area['Datetime'])
  153. his_rp_plant = his_rp[his_rp['Grade']==1]
  154. his_rp_area = his_rp[his_rp['Grade']==0]
  155. all_weights = [dq_fix_weighted(his_rp_area, dq_area)]
  156. for id, dq_id in dq.groupby('PlantID'):
  157. his_rp_id = his_rp_plant[his_rp_plant['ID']==id]
  158. all_weights.append(dq_fix_weighted(his_rp_id, dq_id))
  159. weighted_cols = ['Grade', 'Type', 'ID', 'Value'] + ['P'+str(x) for x in range(1, 17)]
  160. weighted_cdq = pd.DataFrame(all_weights, columns=weighted_cols)
  161. return weighted_cdq
  162. def post_processing(self, df, station_info):
  163. # 假设原DataFrame为df,station_info为station_info_df
  164. # 步骤1:排除Grade=0的行
  165. grade_zero_mask = df['Grade'] == 0
  166. grade_zero_df = df[grade_zero_mask].copy()
  167. non_grade_zero_df = df[~grade_zero_mask].copy()
  168. # 步骤2:合并PlantCap信息
  169. merged_df = non_grade_zero_df.merge(
  170. station_info[['PlantID', 'PlantCap']],
  171. left_on='ID',
  172. right_on='PlantID',
  173. how='left'
  174. ).drop(columns=['PlantID']) # 移除多余的PlantID列
  175. # 步骤3:处理P1-P16列
  176. p_columns = [f'P{i}' for i in range(1, 17)]
  177. # 将大于PlantCap的值设为PlantCap,小于0的值设为0
  178. merged_df[p_columns] = merged_df[p_columns].clip(
  179. lower=0,
  180. upper=merged_df['PlantCap'],
  181. axis=0
  182. ).round(2) # 保留两位小数
  183. merged_df.drop(columns=['PlantCap'], inplace=True) # 移除临时列
  184. # 步骤4:合并处理后的数据与Grade=0的数据
  185. final_df = pd.concat([merged_df, grade_zero_df], axis=0).reset_index(drop=True)
  186. return final_df
  187. def cdq_task(self, config):
  188. """场站级训练任务"""
  189. station_id = -99
  190. try:
  191. print("111")
  192. # 动态生成场站数据路径
  193. print("222")
  194. # 加载数据
  195. data_objects = self.loader.get_material_cdq()
  196. print("333")
  197. dq = data_objects.dq
  198. dq_area = data_objects.dq_area
  199. his_rp = data_objects.cdq_his_rp
  200. begin_time, end_time = data_objects.begin_time, data_objects.end_time
  201. station_info = data_objects.station_info
  202. weighted_cdq = self.calculate_dq_fix_weighted(dq, dq_area, his_rp, begin_time, end_time)
  203. weighted_cdq = self.post_processing(weighted_cdq, station_info)
  204. print("444")
  205. out_dir_cdq = str(os.path.join(config['cdqyc_base_path'], config['moment'], config['input_file']))
  206. out_dir_cdq.replace('IN', 'OUT')
  207. weighted_cdq.to_csv(out_dir_cdq, index=False)
  208. print("555")
  209. return {'status': 'success', 'station_id': station_id, 'weights': local_weights}
  210. except Exception as e:
  211. logging.error(f"Station {station_id} failed: {str(e)}")
  212. return {'status': 'failed', 'station_id': station_id}