David 4 days ago
parent
commit
dbd018acde
3 changed files with 45 additions and 156 deletions
  1. 2 2
      app/model/material.py
  2. 43 10
      app/model/task_worker.py
  3. 0 144
      app/model/tf_region_train.py

+ 2 - 2
app/model/material.py

@@ -147,8 +147,8 @@ class MaterialLoader:
                 station_info_d = pd.read_csv(self.wrapper_path_cdq(self.opt.doc_cdq_mapping['station_info_d_s']), sep=r'\s+', header=0)
                 cap = float(station_info.loc[0, 'PlantCap'])
                 # 去短期预测结果中加载当日短期
-                dq, dq_area = self._load_dq_res(area_id, plant_type)
-                cdq_his_rp = self._load_cdq_history_rp()
+                dq, dq_area = self._load_dq_res(area_id, plant_type, begin_time)
+                cdq_his_rp = self._load_cdq_history_rp(area_id, plant_type)
 
             return types.SimpleNamespace(**{
                 'station_info': station_info,

+ 43 - 10
app/model/task_worker.py

@@ -5,7 +5,7 @@
 # @Author    :David
 # @Company: shenyang JY
 
-import logging
+import logging, os
 import pandas as pd
 from scipy.cluster.hierarchy import weighted
 
@@ -13,8 +13,6 @@ from app.model.tf_model_train import ModelTrainer
 from app.predict.tf_model_pre import ModelPre
 from app.common.data_cleaning import key_field_row_cleaning
 from app.common.config import logger
-from app.model.tf_region_train import RegionTrainer
-from app.model.material import MaterialLoader
 
 
 class TaskTrain(object):
@@ -143,6 +141,7 @@ class CDQTaskPre(object):
         2. 将遍历的历史功率和短期时间对齐,算出过去3个时刻的平均
         3. 根据起止时间获取短期的16个点,根据偏差进行短期加权
         """
+
         def weighted_each_point(his_rp_term, dq_16, error):
             weighted = list(his_rp_term.head(1)[['Grade', 'Type', 'ID', 'Value']].values[0])
             dq_16['dq_fix'] = dq_16['Power'] + error
@@ -170,15 +169,47 @@ class CDQTaskPre(object):
         dq_area['Datetime'] = pd.to_datetime(dq_area['Datetime'])
         his_rp_plant = his_rp[his_rp['Grade']==1]
         his_rp_area = his_rp[his_rp['Grade']==0]
-        all = []
-        all.append(dq_fix_weighted(his_rp_area, dq_area))
+        all_weights = [dq_fix_weighted(his_rp_area, dq_area)]
         for id, dq_id in dq.groupby('PlantID'):
             his_rp_id = his_rp_plant[his_rp_plant['ID']==id]
-            all.append(dq_fix_weighted(his_rp_id, dq_id))
+            all_weights.append(dq_fix_weighted(his_rp_id, dq_id))
         weighted_cols = ['Grade', 'Type', 'ID', 'Value'] + ['P'+str(x) for x in range(1, 17)]
-        weighted_cdq = pd.DataFrame(all, columns=weighted_cols)
+        weighted_cdq = pd.DataFrame(all_weights, columns=weighted_cols)
         return weighted_cdq
 
+
+    def post_processing(self, df, station_info):
+        # 假设原DataFrame为df,station_info为station_info_df
+
+        # 步骤1:排除Grade=0的行
+        grade_zero_mask = df['Grade'] == 0
+        grade_zero_df = df[grade_zero_mask].copy()
+        non_grade_zero_df = df[~grade_zero_mask].copy()
+
+        # 步骤2:合并PlantCap信息
+        merged_df = non_grade_zero_df.merge(
+            station_info[['PlantID', 'PlantCap']],
+            left_on='ID',
+            right_on='PlantID',
+            how='left'
+        ).drop(columns=['PlantID'])  # 移除多余的PlantID列
+
+        # 步骤3:处理P1-P16列
+        p_columns = [f'P{i}' for i in range(1, 17)]
+
+        # 将大于PlantCap的值设为PlantCap,小于0的值设为0
+        merged_df[p_columns] = merged_df[p_columns].clip(
+            lower=0,
+            upper=merged_df['PlantCap'],
+            axis=0
+        ).round(2)  # 保留两位小数
+
+        merged_df.drop(columns=['PlantCap'], inplace=True)  # 移除临时列
+
+        # 步骤4:合并处理后的数据与Grade=0的数据
+        final_df = pd.concat([merged_df, grade_zero_df], axis=0).reset_index(drop=True)
+        return final_df
+
     def cdq_task(self, config):
         """场站级训练任务"""
         station_id = -99
@@ -193,11 +224,13 @@ class CDQTaskPre(object):
             dq_area = data_objects.dq_area
             his_rp = data_objects.cdq_his_rp
             begin_time, end_time = data_objects.begin_time, data_objects.end_time
+            station_info = data_objects.station_info
             weighted_cdq = self.calculate_dq_fix_weighted(dq, dq_area, his_rp, begin_time, end_time)
+            weighted_cdq = self.post_processing(weighted_cdq, station_info)
             print("444")
-            # 模型训练
-            # model = ModelTrainer(station_id, train_data, capacity=data_objects.cap, gpu_id=config.get('gpu_assignment'))
-
+            out_dir_cdq = str(os.path.join(config['cdqyc_base_path'], config['moment'], config['input_file']))
+            out_dir_cdq.replace('IN', 'OUT')
+            weighted_cdq.to_csv(out_dir_cdq, index=False)
             print("555")
             return {'status': 'success', 'station_id': station_id, 'weights': local_weights}
         except Exception as e:

+ 0 - 144
app/model/tf_region_train.py

@@ -1,144 +0,0 @@
-#!/usr/bin/env python
-# -*- coding:utf-8 -*-
-# @FileName  :tf_model_train.py
-# @Time      :2025/4/29 14:05
-# @Author    :David
-# @Company: shenyang JY
-
-import logging
-import os, json
-import time, argparse
-import traceback
-import pandas as pd
-from typing import Dict, Any
-from app.common.tf_lstm import TSHandler
-from app.common.dbmg import MongoUtils
-from app.common.data_handler_region import DataHandlerRegion, write_number_to_file
-from app.common.config import logger, parser
-
-class RegionTrainer:
-    """模型训练器封装类"""
-
-    def __init__(self,
-                 train_data: pd.DataFrame,
-                 capacity: float,
-                 config: Dict[str, Any] = None
-                 ):
-        self.config = config
-        self.logger = logger
-        self.train_data = train_data
-        self.capacity = capacity
-        self.gpu_id = config.get('gpu_assignment')
-        self._setup_resources()
-
-        # 初始化组件
-        self.input_file = config.get("input_file")
-        self.opt = argparse.Namespace(**config)
-        self.dh = DataHandlerRegion(logger, self.opt)
-        self.ts = TSHandler(logger, self.opt)
-        self.mgUtils = MongoUtils(logger)
-
-    def _setup_resources(self):
-        """GPU资源分配"""
-        if self.gpu_id is not None:
-            os.environ["CUDA_VISIBLE_DEVICES"] = str(self.gpu_id)
-            self.logger.info(f"GPU {self.gpu_id} allocated")
-
-
-    def train(self):
-        """执行训练流程"""
-        # 获取程序开始时间
-        start_time = time.time()
-        success = 0
-        print("aaa")
-        farm_id = self.input_file.split('/')[-2]
-        output_file = self.input_file.replace('IN', 'OUT')
-        status_file = 'STATUS.TXT'
-        try:
-            # ------------ 获取数据,预处理训练数据 ------------
-            self.dh.opt.cap = self.capacity
-            train_x, valid_x, train_y, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap = self.dh.train_data_handler(self.train_data)
-            self.ts.opt.Model['input_size'] = train_x.shape[2]
-            # ------------ 训练模型,保存模型 ------------
-            # 1. 如果是加强训练模式,先加载预训练模型特征参数,再预处理训练数据
-            # 2. 如果是普通模式,先预处理训练数据,再根据训练数据特征加载模型
-            print("bbb")
-            model = self.ts.train_init() if self.ts.opt.Model['add_train'] else self.ts.get_keras_model(self.ts.opt)
-            if self.ts.opt.Model['add_train']:
-                if model:
-                    feas = json.loads(self.ts.model_params).get('features', self.dh.opt.features)
-                    if set(feas).issubset(set(self.dh.opt.features)):
-                        self.dh.opt.features = list(feas)
-                        train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap = self.dh.train_data_handler(self.train_data)
-                    else:
-                        model = self.ts.get_keras_model(self.ts.opt)
-                        self.logger.info("训练数据特征,不满足,加强训练模型特征")
-                else:
-                    model = self.ts.get_keras_model(self.ts.opt)
-            print("ccc")
-            # 执行训练
-            trained_model = self.ts.training(model, [train_x, valid_x, train_y, valid_y])
-            # 模型持久化
-            success = 1
-            print('ddd')
-            # 更新算法状态:1. 启动成功
-            write_number_to_file(os.path.join(output_file, status_file), 1, 1, 'rewrite')
-            # ------------ 组装模型数据 ------------
-            self.opt.Model['features'] = ','.join(self.dh.opt.features)
-            self.config.update({
-                'params': json.dumps(self.config['Model']),
-                'descr': f'南网竞赛-{farm_id}',
-                'gen_time': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()),
-                'model_table': self.config['model_table'] + farm_id,
-                'scaler_table': self.config['scaler_table'] + farm_id
-            })
-            self.mgUtils.insert_trained_model_into_mongo(trained_model, self.config)
-            self.mgUtils.insert_scaler_model_into_mongo(scaled_train_bytes, scaled_target_bytes, self.config)
-            # 更新算法状态:正常结束
-            print("eee")
-            write_number_to_file(os.path.join(output_file, status_file), 2, 2)
-            return True
-        except Exception as e:
-            self._handle_error(e)
-            return False
-
-    def _initialize_model(self):
-        """模型初始化策略"""
-        if self.ts.opt.Model['add_train']:
-            pretrained = self.ts.train_init()
-            return pretrained if self._check_feature_compatibility(pretrained) else self.ts.get_keras_model()
-        return self.ts.get_keras_model()
-
-    def _check_feature_compatibility(self, model) -> bool:
-        """检查特征兼容性"""
-        # 原始逻辑中的特征校验实现
-        pass
-
-
-    def _handle_error(self, error: Exception):
-        """统一错误处理"""
-        error_msg = traceback.format_exc()
-        self.logger.error(f"Training failed: {str(error)}\n{error_msg}")
-
-
-
-
-
-# 使用示例
-if __name__ == "__main__":
-    config = {
-        'base_path': '/data/power_forecast',
-        'capacities': {
-            '1001': 2.5,
-            '1002': 3.0,
-            # ... 其他场站配置
-        },
-        'gpu_assignment': [0, 1, 2, 3]  # 可用GPU列表
-    }
-
-    orchestrator = TrainingOrchestrator(
-        station_ids=['1001', '1002', '1003'],  # 实际场景下生成数百个ID
-        config=config,
-        max_workers=4  # 根据GPU数量设置
-    )
-    orchestrator.execute()