#!/usr/bin/env python # -*- coding:utf-8 -*- # @FileName :task_worker.py # @Time :2025/4/29 11:05 # @Author :David # @Company: shenyang JY import logging, os import pandas as pd from scipy.cluster.hierarchy import weighted from app.model.tf_model_train import ModelTrainer from app.predict.tf_model_pre import ModelPre from app.common.data_cleaning import key_field_row_cleaning from app.common.config import logger class TaskTrain(object): def __init__(self, loader): self.loader = loader def station_task(self, config): """场站级训练任务""" station_id = -99 try: print("111") station_id = config['station_id'] # 动态生成场站数据路径 print("222") # 加载数据 data_objects = self.loader.get_material(station_id) local_weights = self.loader.add_weights(data_objects) print("333") # 数据合并 train_data = pd.merge(data_objects.nwp_v_h, data_objects.power, on=config['col_time']) print("444") # 模型训练 # model = ModelTrainer(station_id, train_data, capacity=data_objects.cap, gpu_id=config.get('gpu_assignment')) model = ModelTrainer(train_data, capacity=data_objects.cap, config=config) model.train() print("555") return {'status': 'success', 'station_id': station_id, 'weights': local_weights} except Exception as e: logging.error(f"Station {station_id} failed: {str(e)}") return {'status': 'failed', 'station_id': station_id} def region_task(self, config, data_nwps): """区域级训练任务""" area_id = -99 try: print("111") # 动态生成场站数据路径 print("222") # 加载数据 data_objects = self.loader.get_material_region() config['area_id'] = data_objects.area_id area_id = data_objects.area_id print("333") # 数据合并 print(data_nwps.nwp) print(data_nwps.nwp_v) print("累加的区域装机量{},实际区域装机量{}".format(data_nwps.total_cap, data_objects.area_cap)) train_data = pd.merge(data_nwps.nwp_v_h, data_objects.power, on=config['col_time']) print("444") # 模型训练 model = ModelTrainer(train_data, capacity=data_objects.area_cap, config=config) model.train(pre_area=True) print("555") return {'status': 'success', 'area_id': area_id} except Exception as e: logging.error(f"Area {area_id} failed: {str(e)}") return {'status': 'failed', 'area_id': area_id} class TaskPre(object): def __init__(self, loader): self.loader = loader def station_task(self, config): """场站级训练任务""" station_id = -99 try: print("111") station_id = config['station_id'] # 动态生成场站数据路径 print("222") # 加载数据 data_objects = self.loader.get_material(station_id) local_weights = self.loader.add_weights(data_objects) print("333") # 数据合并 pre_data = data_objects.nwp_v print("444") # 模型训练 # model = ModelTrainer(station_id, train_data, capacity=data_objects.cap, gpu_id=config.get('gpu_assignment')) model = ModelPre(pre_data, capacity=data_objects.cap, config=config) model.predict() print("555") return {'status': 'success', 'station_id': station_id, 'weights': local_weights} except Exception as e: logging.error(f"Station {station_id} failed: {str(e)}") return {'status': 'failed', 'station_id': station_id} def region_task(self, config, data_nwps): """区域级训练任务""" area_id = -99 try: print("111") # 动态生成场站数据路径 print("222") # 加载数据 data_objects = self.loader.get_material_region() config['area_id'] = data_objects.area_id area_id = data_objects.area_id print("333") # 数据合并 print(data_nwps.nwp) print(data_nwps.nwp_v) print("累加的区域装机量{},实际区域装机量{}".format(data_nwps.total_cap, data_objects.area_cap)) pre_data = data_nwps.nwp_v print("444") # 模型训练 model = ModelPre(pre_data, capacity=data_objects.area_cap, config=config) model.predict(pre_area=True) print("555") return {'status': 'success', 'area_id': area_id} except Exception as e: logging.error(f"Area {area_id} failed: {str(e)}") return {'status': 'failed', 'area_id': area_id} class CDQTaskPre(object): def __init__(self, loader, opt): self.loader = loader self.opt = opt def calculate_dq_fix_weighted(self, dq, dq_area, his_rp, begin_time, end_time): """ 1. 依次从场站编号、最后是区域编号进行遍历 2. 将遍历的历史功率和短期时间对齐,算出过去3个时刻的平均 3. 根据起止时间获取短期的16个点,根据偏差进行短期加权 """ def weighted_each_point(his_rp_term, dq_16, error): weighted = list(his_rp_term.head(1)[['Grade', 'Type', 'ID', 'Value']].values[0]) dq_16['dq_fix'] = dq_16['Power'] + error for point, row in dq_16.iterrows(): T = 'T'+str(point + 1) cdq_value = row['dq_fix']*self.opt.coe[T]['n'] + row['Power']*self.opt.coe[T]['m'] weighted.append(cdq_value) return weighted def dq_fix_weighted(his_rp_term, dq_term): his_rp_dq = pd.merge(his_rp_term, dq_term, left_on='moment', right_on='Datetime') his_rp_dq_clean = key_field_row_cleaning(his_rp_dq, ['Value'], logger) if not his_rp_dq_clean.empty: his_rp_dq['error'] = his_rp_dq['Value'] - his_rp_dq['Power'] error = his_rp_dq['error'].mean() else: error = 0 dq_term = dq_term.set_index('Datetime') dq_16 = dq_term.loc[begin_time: end_time].reset_index(drop=False) cdq_16 = weighted_each_point(his_rp_term, dq_16, error) return cdq_16 his_rp['moment'] = pd.to_datetime(his_rp['moment']) dq['Datetime'] = pd.to_datetime(dq['Datetime']) dq_area['Datetime'] = pd.to_datetime(dq_area['Datetime']) his_rp_plant = his_rp[his_rp['Grade']==1] his_rp_area = his_rp[his_rp['Grade']==0] all_weights = [dq_fix_weighted(his_rp_area, dq_area)] for id, dq_id in dq.groupby('PlantID'): his_rp_id = his_rp_plant[his_rp_plant['ID']==id] all_weights.append(dq_fix_weighted(his_rp_id, dq_id)) weighted_cols = ['Grade', 'Type', 'ID', 'Value'] + ['P'+str(x) for x in range(1, 17)] weighted_cdq = pd.DataFrame(all_weights, columns=weighted_cols) return weighted_cdq def post_processing(self, df, station_info): # 假设原DataFrame为df,station_info为station_info_df # 步骤1:排除Grade=0的行 grade_zero_mask = df['Grade'] == 0 grade_zero_df = df[grade_zero_mask].copy() non_grade_zero_df = df[~grade_zero_mask].copy() # 步骤2:合并PlantCap信息 merged_df = non_grade_zero_df.merge( station_info[['PlantID', 'PlantCap']], left_on='ID', right_on='PlantID', how='left' ).drop(columns=['PlantID']) # 移除多余的PlantID列 # 步骤3:处理P1-P16列 p_columns = [f'P{i}' for i in range(1, 17)] # 将大于PlantCap的值设为PlantCap,小于0的值设为0 merged_df[p_columns] = merged_df[p_columns].clip( lower=0, upper=merged_df['PlantCap'], axis=0 ).round(2) # 保留两位小数 merged_df.drop(columns=['PlantCap'], inplace=True) # 移除临时列 # 步骤4:合并处理后的数据与Grade=0的数据 final_df = pd.concat([merged_df, grade_zero_df], axis=0).reset_index(drop=True) return final_df def cdq_task(self, config): """场站级训练任务""" station_id = -99 try: print("111") # 动态生成场站数据路径 print("222") # 加载数据 data_objects = self.loader.get_material_cdq() print("333") dq = data_objects.dq dq_area = data_objects.dq_area his_rp = data_objects.cdq_his_rp begin_time, end_time = data_objects.begin_time, data_objects.end_time station_info = data_objects.station_info weighted_cdq = self.calculate_dq_fix_weighted(dq, dq_area, his_rp, begin_time, end_time) weighted_cdq = self.post_processing(weighted_cdq, station_info) print("444") out_dir_cdq = str(os.path.join(config['cdqyc_base_path'], config['moment'], config['input_file'])) out_dir_cdq.replace('IN', 'OUT') weighted_cdq.to_csv(out_dir_cdq, index=False) print("555") return {'status': 'success', 'station_id': station_id, 'weights': local_weights} except Exception as e: logging.error(f"Station {station_id} failed: {str(e)}") return {'status': 'failed', 'station_id': station_id}