123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- # @FileName :task_worker.py
- # @Time :2025/4/29 11:05
- # @Author :David
- # @Company: shenyang JY
- import logging, os
- import pandas as pd
- from scipy.cluster.hierarchy import weighted
- from app.model.tf_model_train import ModelTrainer
- from app.predict.tf_model_pre import ModelPre
- from app.common.data_cleaning import key_field_row_cleaning
- from app.common.config import logger
- class TaskTrain(object):
- def __init__(self, loader):
- self.loader = loader
- def station_task(self, config):
- """场站级训练任务"""
- station_id = -99
- try:
- print("111")
- station_id = config['station_id']
- # 动态生成场站数据路径
- print("222")
- # 加载数据
- data_objects = self.loader.get_material(station_id)
- local_weights = self.loader.add_weights(data_objects)
- print("333")
- # 数据合并
- train_data = pd.merge(data_objects.nwp_v_h, data_objects.power, on=config['col_time'])
- print("444")
- # 模型训练
- # model = ModelTrainer(station_id, train_data, capacity=data_objects.cap, gpu_id=config.get('gpu_assignment'))
- model = ModelTrainer(train_data, capacity=data_objects.cap, config=config)
- model.train()
- print("555")
- return {'status': 'success', 'station_id': station_id, 'weights': local_weights}
- except Exception as e:
- logging.error(f"Station {station_id} failed: {str(e)}")
- return {'status': 'failed', 'station_id': station_id}
- def region_task(self, config, data_nwps):
- """区域级训练任务"""
- area_id = -99
- try:
- print("111")
- # 动态生成场站数据路径
- print("222")
- # 加载数据
- data_objects = self.loader.get_material_region()
- config['area_id'] = data_objects.area_id
- area_id = data_objects.area_id
- print("333")
- # 数据合并
- print(data_nwps.nwp)
- print(data_nwps.nwp_v)
- print("累加的区域装机量{},实际区域装机量{}".format(data_nwps.total_cap, data_objects.area_cap))
- train_data = pd.merge(data_nwps.nwp_v_h, data_objects.power, on=config['col_time'])
- print("444")
- # 模型训练
- model = ModelTrainer(train_data, capacity=data_objects.area_cap, config=config)
- model.train(pre_area=True)
- print("555")
- return {'status': 'success', 'area_id': area_id}
- except Exception as e:
- logging.error(f"Area {area_id} failed: {str(e)}")
- return {'status': 'failed', 'area_id': area_id}
- class TaskPre(object):
- def __init__(self, loader):
- self.loader = loader
- def station_task(self, config):
- """场站级训练任务"""
- station_id = -99
- try:
- print("111")
- station_id = config['station_id']
- # 动态生成场站数据路径
- print("222")
- # 加载数据
- data_objects = self.loader.get_material(station_id)
- local_weights = self.loader.add_weights(data_objects)
- print("333")
- # 数据合并
- pre_data = data_objects.nwp_v
- print("444")
- # 模型训练
- # model = ModelTrainer(station_id, train_data, capacity=data_objects.cap, gpu_id=config.get('gpu_assignment'))
- model = ModelPre(pre_data, capacity=data_objects.cap, config=config)
- model.predict()
- print("555")
- return {'status': 'success', 'station_id': station_id, 'weights': local_weights}
- except Exception as e:
- logging.error(f"Station {station_id} failed: {str(e)}")
- return {'status': 'failed', 'station_id': station_id}
- def region_task(self, config, data_nwps):
- """区域级训练任务"""
- area_id = -99
- try:
- print("111")
- # 动态生成场站数据路径
- print("222")
- # 加载数据
- data_objects = self.loader.get_material_region()
- config['area_id'] = data_objects.area_id
- area_id = data_objects.area_id
- print("333")
- # 数据合并
- print(data_nwps.nwp)
- print(data_nwps.nwp_v)
- print("累加的区域装机量{},实际区域装机量{}".format(data_nwps.total_cap, data_objects.area_cap))
- pre_data = data_nwps.nwp_v
- print("444")
- # 模型训练
- model = ModelPre(pre_data, capacity=data_objects.area_cap, config=config)
- model.predict(pre_area=True)
- print("555")
- return {'status': 'success', 'area_id': area_id}
- except Exception as e:
- logging.error(f"Area {area_id} failed: {str(e)}")
- return {'status': 'failed', 'area_id': area_id}
- class CDQTaskPre(object):
- def __init__(self, loader, opt):
- self.loader = loader
- self.opt = opt
- def calculate_dq_fix_weighted(self, dq, dq_area, his_rp, begin_time, end_time):
- """
- 1. 依次从场站编号、最后是区域编号进行遍历
- 2. 将遍历的历史功率和短期时间对齐,算出过去3个时刻的平均
- 3. 根据起止时间获取短期的16个点,根据偏差进行短期加权
- """
- def weighted_each_point(his_rp_term, dq_16, error):
- weighted = list(his_rp_term.head(1)[['Grade', 'Type', 'ID', 'Value']].values[0])
- dq_16['dq_fix'] = dq_16['Power'] + error
- for point, row in dq_16.iterrows():
- T = 'T'+str(point + 1)
- cdq_value = row['dq_fix']*self.opt.coe[T]['n'] + row['Power']*self.opt.coe[T]['m']
- weighted.append(cdq_value)
- return weighted
- def dq_fix_weighted(his_rp_term, dq_term):
- his_rp_dq = pd.merge(his_rp_term, dq_term, left_on='moment', right_on='Datetime')
- his_rp_dq_clean = key_field_row_cleaning(his_rp_dq, ['Value'], logger)
- if not his_rp_dq_clean.empty:
- his_rp_dq['error'] = his_rp_dq['Value'] - his_rp_dq['Power']
- error = his_rp_dq['error'].mean()
- else:
- error = 0
- dq_term = dq_term.set_index('Datetime')
- dq_16 = dq_term.loc[begin_time: end_time].reset_index(drop=False)
- cdq_16 = weighted_each_point(his_rp_term, dq_16, error)
- return cdq_16
- his_rp['moment'] = pd.to_datetime(his_rp['moment'])
- dq['Datetime'] = pd.to_datetime(dq['Datetime'])
- dq_area['Datetime'] = pd.to_datetime(dq_area['Datetime'])
- his_rp_plant = his_rp[his_rp['Grade']==1]
- his_rp_area = his_rp[his_rp['Grade']==0]
- all_weights = [dq_fix_weighted(his_rp_area, dq_area)]
- for id, dq_id in dq.groupby('PlantID'):
- his_rp_id = his_rp_plant[his_rp_plant['ID']==id]
- all_weights.append(dq_fix_weighted(his_rp_id, dq_id))
- weighted_cols = ['Grade', 'Type', 'ID', 'Value'] + ['P'+str(x) for x in range(1, 17)]
- weighted_cdq = pd.DataFrame(all_weights, columns=weighted_cols)
- return weighted_cdq
- def post_processing(self, df, station_info):
- # 假设原DataFrame为df,station_info为station_info_df
- # 步骤1:排除Grade=0的行
- grade_zero_mask = df['Grade'] == 0
- grade_zero_df = df[grade_zero_mask].copy()
- non_grade_zero_df = df[~grade_zero_mask].copy()
- # 步骤2:合并PlantCap信息
- merged_df = non_grade_zero_df.merge(
- station_info[['PlantID', 'PlantCap']],
- left_on='ID',
- right_on='PlantID',
- how='left'
- ).drop(columns=['PlantID']) # 移除多余的PlantID列
- # 步骤3:处理P1-P16列
- p_columns = [f'P{i}' for i in range(1, 17)]
- # 将大于PlantCap的值设为PlantCap,小于0的值设为0
- merged_df[p_columns] = merged_df[p_columns].clip(
- lower=0,
- upper=merged_df['PlantCap'],
- axis=0
- ).round(2) # 保留两位小数
- merged_df.drop(columns=['PlantCap'], inplace=True) # 移除临时列
- # 步骤4:合并处理后的数据与Grade=0的数据
- final_df = pd.concat([merged_df, grade_zero_df], axis=0).reset_index(drop=True)
- return final_df
- def cdq_task(self, config):
- """场站级训练任务"""
- station_id = -99
- try:
- print("111")
- # 动态生成场站数据路径
- print("222")
- # 加载数据
- data_objects = self.loader.get_material_cdq()
- print("333")
- dq = data_objects.dq
- dq_area = data_objects.dq_area
- his_rp = data_objects.cdq_his_rp
- begin_time, end_time = data_objects.begin_time, data_objects.end_time
- station_info = data_objects.station_info
- weighted_cdq = self.calculate_dq_fix_weighted(dq, dq_area, his_rp, begin_time, end_time)
- weighted_cdq = self.post_processing(weighted_cdq, station_info)
- print("444")
- out_dir_cdq = str(os.path.join(config['cdqyc_base_path'], config['moment'], config['input_file']))
- out_dir_cdq.replace('IN', 'OUT')
- weighted_cdq.to_csv(out_dir_cdq, index=False)
- print("555")
- return {'status': 'success', 'station_id': station_id, 'weights': local_weights}
- except Exception as e:
- logging.error(f"Station {station_id} failed: {str(e)}")
- return {'status': 'failed', 'station_id': station_id}
|