David 3 dni temu
rodzic
commit
cd071a12a0

+ 304 - 0
app/common/data_handler_region.py

@@ -0,0 +1,304 @@
+#!/usr/bin/env python
+# -*- coding:utf-8 -*-
+# @FileName  :data_handler.py
+# @Time      :2025/1/8 14:56
+# @Author    :David
+# @Company: shenyang JY
+import argparse, numbers, joblib
+import numpy as np
+import pandas as pd
+from io import BytesIO
+from bson.decimal128 import Decimal128
+from sklearn.preprocessing import MinMaxScaler
+from app.common.data_cleaning import *
+
+class DataHandlerRegion(object):
+    def __init__(self, logger, opt):
+        self.logger = logger
+        self.opt = opt
+
+    def get_train_data(self, dfs, col_time, target):
+        train_x, valid_x, train_y, valid_y = [], [], [], []
+        for i, df in enumerate(dfs, start=1):
+            if len(df) < self.opt.Model["time_step"]:
+                self.logger.info("特征处理-训练数据-不满足time_step")
+            datax, datay = self.get_timestep_features(df, col_time, target, is_train=True)
+            if len(datax) < 10:
+                self.logger.info("特征处理-训练数据-无法进行最小分割")
+                continue
+            tx, vx, ty, vy = self.train_valid_split(datax, datay, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
+            train_x.extend(tx)
+            valid_x.extend(vx)
+            train_y.extend(ty)
+            valid_y.extend(vy)
+
+        train_y = np.concatenate(np.array([[y.iloc[:, 1].values for y in train_y]]), axis=0)
+        valid_y = np.concatenate(np.array([[y.iloc[:, 1].values for y in valid_y]]), axis=0)
+
+        train_x = np.array([x.values for x in train_x])
+        valid_x = np.array([x.values for x in valid_x])
+
+        return train_x, valid_x, train_y, valid_y
+
+    def get_predict_data(self, dfs):
+        test_x = []
+        for i, df in enumerate(dfs, start=1):
+            if len(df) < self.opt.Model["time_step"]:
+                self.logger.info("特征处理-预测数据-不满足time_step")
+                continue
+            datax = self.get_predict_features(df)
+            test_x.append(datax)
+        test_x = np.concatenate(test_x, axis=0)
+        return test_x
+
+    def get_predict_features(self, norm_data):
+        """
+        均分数据,获取预测数据集
+        """
+        time_step = self.opt.Model["time_step"]
+        feature_data = norm_data.reset_index(drop=True)
+        time_step_loc = time_step - 1
+        iters = int(len(feature_data)) // self.opt.Model['time_step']
+        end = int(len(feature_data)) % self.opt.Model['time_step']
+        features_x = np.array([feature_data.loc[i*time_step:i*time_step + time_step_loc, self.opt.features].reset_index(drop=True) for i in range(iters)])
+        if end > 0:
+            df = feature_data.tail(end)
+            df_repeated = pd.concat([df] + [pd.DataFrame([df.iloc[0]]* (time_step-end))]).reset_index(drop=True)
+            features_x = np.concatenate((features_x, np.expand_dims(df_repeated, 0)), axis=0)
+        return features_x
+
+    def get_timestep_features(self, norm_data, col_time, target, is_train):
+        """
+        步长分割数据,获取时序训练集
+        """
+        time_step = self.opt.Model["time_step"]
+        feature_data = norm_data.reset_index(drop=True)
+        time_step_loc = time_step - 1
+        train_num = int(len(feature_data))
+        label_features = [col_time, target] if is_train is True else [col_time, target]
+        nwp_cs = self.opt.features
+        nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step + 1)]  # 数据库字段 'C_T': 'C_WS170'
+        labels = [feature_data.loc[i:i + time_step_loc, label_features].reset_index(drop=True) for i in range(train_num - time_step + 1)]
+        features_x, features_y = [], []
+        for i, row in enumerate(zip(nwp, labels)):
+            features_x.append(row[0])
+            features_y.append(row[1])
+        return features_x, features_y
+
+    def fill_train_data(self, unite, col_time):
+        """
+        补值
+        """
+        unite[col_time] = pd.to_datetime(unite[col_time])
+        unite['time_diff'] = unite[col_time].diff()
+        dt_short = pd.Timedelta(minutes=15)
+        dt_long = pd.Timedelta(minutes=15 * self.opt.Model['how_long_fill'])
+        data_train = self.missing_time_splite(unite, dt_short, dt_long, col_time)
+        miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)]
+        miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0) / (15 * 60) - len(miss_points)
+        self.logger.info("再次测算,需要插值的总点数为:{}".format(miss_number))
+        if miss_number > 0 and self.opt.Model["train_data_fill"]:
+            data_train = self.data_fill(data_train, col_time)
+        return data_train
+
+    def fill_pre_data(self, unite):
+        unite = unite.interpolate(method='linear')  # nwp先进行线性填充
+        unite = unite.ffill().bfill() # 再对超过采样边缘无法填充的点进行二次填充
+        return unite
+
+    def missing_time_splite(self, df, dt_short, dt_long, col_time):
+        df.reset_index(drop=True, inplace=True)
+        n_long, n_short, n_points = 0, 0, 0
+        start_index = 0
+        dfs = []
+        for i in range(1, len(df)):
+            if df['time_diff'][i] >= dt_long:
+                df_long = df.iloc[start_index:i, :-1]
+                dfs.append(df_long)
+                start_index = i
+                n_long += 1
+            if df['time_diff'][i] > dt_short:
+                self.logger.info(f"{df[col_time][i-1]} ~ {df[col_time][i]}")
+                points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1
+                self.logger.info("缺失点数:{}".format(points))
+                if df['time_diff'][i] < dt_long:
+                    n_short += 1
+                    n_points += points
+                    self.logger.info("需要补值的点数:{}".format(points))
+        dfs.append(df.iloc[start_index:, :-1])
+        self.logger.info(f"数据总数:{len(df)}, 时序缺失的间隔:{n_short}, 其中,较长的时间间隔:{n_long}")
+        self.logger.info("需要补值的总点数:{}".format(n_points))
+        return dfs
+
+    def data_fill(self, dfs, col_time, test=False):
+        dfs_fill, inserts = [], 0
+        for i, df in enumerate(dfs):
+            df = rm_duplicated(df, self.logger)
+            df1 = df.set_index(col_time, inplace=False)
+            dff = df1.resample('15T').interpolate(method='linear')  # 采用线性补值,其他补值方法需要进一步对比
+            dff.reset_index(inplace=True)
+            points = len(dff) - len(df1)
+            dfs_fill.append(dff)
+            self.logger.info("{} ~ {} 有 {} 个点, 填补 {} 个点.".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points))
+            inserts += points
+        name = "预测数据" if test is True else "训练集"
+        self.logger.info("{}分成了{}段,实际一共补值{}点".format(name, len(dfs_fill), inserts))
+        return dfs_fill
+
+    def train_valid_split(self, datax, datay, valid_rate, shuffle):
+        shuffle_index = np.random.permutation(len(datax))
+        indexs = shuffle_index.tolist() if shuffle else np.arange(0, len(datax)).tolist()
+        valid_size = int(len(datax) * valid_rate)
+        valid_index = indexs[-valid_size:]
+        train_index = indexs[:-valid_size]
+        tx, vx, ty, vy = [], [], [], []
+        for i, data in enumerate(zip(datax, datay)):
+            if i in train_index:
+                tx.append(data[0])
+                ty.append(data[1])
+            elif i in valid_index:
+                vx.append(data[0])
+                vy.append(data[1])
+        return tx, vx, ty, vy
+
+    def train_data_handler(self, data, bp_data=False):
+        """
+        训练数据预处理:
+        清洗+补值+归一化
+        Args:
+            data: 从mongo中加载的数据
+            opt:参数命名空间
+        return:
+            x_train
+            x_valid
+            y_train
+            y_valid
+        """
+        col_time, features, target = self.opt.col_time, self.opt.features, self.opt.target
+        # 清洗限电记录
+        if 'is_limit' in data.columns:
+            data = data[data['is_limit'] == False]
+        # 筛选特征,数值化,排序
+        train_data = data[[col_time] + features + [target]]
+        train_data = train_data.applymap(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x)
+        train_data = train_data.sort_values(by=col_time)
+        # 清洗特征平均缺失率大于20%的天
+        # train_data = missing_features(train_data, features, col_time)
+        # 对清洗完限电的数据进行特征预处理:
+        # 1.空值异常值清洗
+        train_data_cleaned = cleaning(train_data, '训练集', self.logger, features + [target], col_time=col_time)
+        self.opt.features = [x for x in train_data_cleaned.columns.tolist() if x not in [target, col_time] and x in features]
+        # 2. 标准化
+        # 创建特征和目标的标准化器
+        train_scaler = MinMaxScaler(feature_range=(0, 1))
+        target_scaler = MinMaxScaler(feature_range=(0, 1))
+        # 标准化特征和目标
+        scaled_train_data = train_scaler.fit_transform(train_data_cleaned[self.opt.features])
+        scaled_target = target_scaler.fit_transform(train_data_cleaned[[target]])
+        scaled_cap = target_scaler.transform(np.array([[float(self.opt.cap)]]))[0,0]
+        train_data_cleaned[self.opt.features] = scaled_train_data
+        train_data_cleaned[[target]] = scaled_target
+        # 3.缺值补值
+        train_datas = self.fill_train_data(train_data_cleaned, col_time)
+        # 保存两个scaler
+        scaled_train_bytes = BytesIO()
+        scaled_target_bytes = BytesIO()
+        joblib.dump(train_scaler, scaled_train_bytes)
+        joblib.dump(target_scaler, scaled_target_bytes)
+        scaled_train_bytes.seek(0)  # Reset pointer to the beginning of the byte stream
+        scaled_target_bytes.seek(0)
+
+        if bp_data:
+            train_data = pd.concat(train_datas, axis=0)
+            train_x, valid_x, train_y, valid_y = self.train_valid_split(train_data[self.opt.features].values, train_data[target].values, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
+            train_x, valid_x, train_y, valid_y =  np.array(train_x), np.array(valid_x), np.array(train_y), np.array(valid_y)
+        else:
+            train_x, valid_x, train_y, valid_y = self.get_train_data(train_datas, col_time, target)
+        return train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap
+
+    def pre_data_handler(self, data, feature_scaler, bp_data=False):
+        """
+        预测数据简单处理
+        Args:
+            data: 从mongo中加载的数据
+            opt:参数命名空间
+        return:
+            scaled_features: 反归一化的特征
+        """
+        # 清洗限电记录
+        if 'is_limit' in data.columns:
+            data = data[data['is_limit'] == False]
+        # features, time_steps, col_time, model_name, col_reserve = str_to_list(args['features']), int(
+        #     args['time_steps']), args['col_time'], args['model_name'], str_to_list(args['col_reserve'])
+        col_time, features = self.opt.col_time, self.opt.features
+        data = data.map(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x)
+        data = data.sort_values(by=col_time).reset_index(drop=True, inplace=False)
+        if not set(features).issubset(set(data.columns.tolist())):
+            raise ValueError("预测数据特征不满足模型特征!")
+        pre_data = data[features].copy()
+        if self.opt.Model['predict_data_fill']:
+            pre_data = self.fill_pre_data(pre_data)
+        pre_data.loc[:, features] = feature_scaler.transform(pre_data)
+        if bp_data:
+            pre_x = np.array(pre_data)
+        else:
+            pre_x = self.get_predict_data([pre_data])
+        return pre_x, data
+
+def write_number_to_file(file_path, line_number, number, mode='overwrite'):
+    """
+    向指定文件写入数字(支持清空重写和覆盖两种模式)
+
+    参数:
+    file_path (str): 文件路径
+    line_number (int): 行号(从1开始)
+    number (int/float): 要写入的数字
+    mode (str): 写入模式 - 'overwrite'覆盖指定行 或 'rewrite'清空重写
+
+    返回:
+    bool: 操作是否成功
+
+    示例:
+    write_number_to_file("data.txt", 3, 42.5)          # 覆盖模式
+    write_number_to_file("data.txt", 2, 99, 'rewrite') # 清空重写模式
+    """
+    try:
+        # 参数验证
+        if line_number < 1:
+            raise ValueError("行号必须大于等于1")
+        if mode not in ['overwrite', 'rewrite']:
+            raise ValueError("模式参数必须为 'overwrite' 或 'rewrite'")
+
+        # 转换数字为带换行的字符串
+        number_str = f"{number}\n"
+        target_index = line_number - 1
+
+        # 处理不同模式
+        if mode == 'rewrite':
+            # 清空后创建新内容
+            lines = ['\n'] * target_index  # 填充前置空行
+            lines.append(number_str)
+        else:
+            # 覆盖模式处理
+            try:
+                with open(file_path, 'r', encoding='utf-8') as f:
+                    lines = f.readlines()
+            except FileNotFoundError:
+                lines = []
+
+            # 扩展行数到目标位置
+            while len(lines) <= target_index:
+                lines.append('\n')
+
+            # 替换目标行内容
+            lines[target_index] = number_str
+
+        # 写入文件
+        with open(file_path, 'w', encoding='utf-8') as f:
+            f.writelines(lines)
+
+        return True
+
+    except Exception as e:
+        print(f"操作失败: {str(e)}")
+        return False

+ 5 - 4
app/model/main.py

@@ -15,7 +15,7 @@ from copy import deepcopy
 from concurrent.futures import ProcessPoolExecutor
 from app.common.config import parser, logger
 from app.model.resource_manager import ResourceController
-from app.model.task_worker import station_task
+from app.model.task_worker import station_task, region_task
 """"
 调用思路
    xxxx 1. 从入口参数中获取IN OUT文件位置 xxxx
@@ -50,8 +50,8 @@ def main():
     all_stations = [str(child) for child in Path(opt.input_file).iterdir() if child.is_dir()]
     # task_func = partial(station_task, config=config)
 
-    # ---------------------------- 监控任务 ----------------------------
-    # 进度跟踪
+    # ---------------------------- 监控任务,进度跟踪 ----------------------------
+    # 场站级功率预测训练
     completed = 0
     with tqdm(total=len(all_stations)) as pbar:
         with ProcessPoolExecutor(max_workers=rc.cpu_cores) as executor:
@@ -77,7 +77,8 @@ def main():
                 pbar.set_postfix_str(f"Completed: {completed}/{len(all_stations)}")
 
     print(f"Final result: {completed} stations trained successfully")
-
+    # 区域级功率预测训练
+    region_task(all_stations, config)
 
 if __name__ == "__main__":
     main()

+ 22 - 2
app/model/task_worker.py

@@ -36,6 +36,26 @@ def station_task(config):
         return {'status': 'failed', 'station_id': station_id}
 
 
-def area_task():
+def region_task(all_stations, config):
     """区域级训练任务"""
-    pass
+    try:
+        print("111")
+        station_id = config['station_id']
+        mate = MaterialLoader(base_path=config['input_file'])
+        # 动态生成场站数据路径
+        print("222")
+        # 加载数据
+        data_objects = mate.get_material(station_id)
+        print("333")
+        # 数据合并
+        train_data = pd.merge(data_objects.nwp_v_h, data_objects.power, on=config['col_time'])
+        print("444")
+        # 模型训练
+        # model = ModelTrainer(station_id, train_data, capacity=data_objects.cap, gpu_id=config.get('gpu_assignment'))
+        model = ModelTrainer(train_data, capacity=data_objects.cap, config=config)
+        model.train()
+        print("555")
+        return {'status': 'success', 'station_id': station_id}
+    except Exception as e:
+        logging.error(f"Station {station_id} failed: {str(e)}")
+        return {'status': 'failed', 'station_id': station_id}

+ 144 - 0
app/model/tf_region_train.py

@@ -0,0 +1,144 @@
+#!/usr/bin/env python
+# -*- coding:utf-8 -*-
+# @FileName  :tf_model_train.py
+# @Time      :2025/4/29 14:05
+# @Author    :David
+# @Company: shenyang JY
+
+import logging
+import os, json
+import time, argparse
+import traceback
+import pandas as pd
+from typing import Dict, Any
+from app.common.tf_lstm import TSHandler
+from app.common.dbmg import MongoUtils
+from app.common.data_handler_region import DataHandlerRegion, write_number_to_file
+from app.common.config import logger, parser
+
+class RegionTrainer:
+    """模型训练器封装类"""
+
+    def __init__(self,
+                 train_data: pd.DataFrame,
+                 capacity: float,
+                 config: Dict[str, Any] = None
+                 ):
+        self.config = config
+        self.logger = logger
+        self.train_data = train_data
+        self.capacity = capacity
+        self.gpu_id = config.get('gpu_assignment')
+        self._setup_resources()
+
+        # 初始化组件
+        self.input_file = config.get("input_file")
+        self.opt = argparse.Namespace(**config)
+        self.dh = DataHandlerRegion(logger, self.opt)
+        self.ts = TSHandler(logger, self.opt)
+        self.mgUtils = MongoUtils(logger)
+
+    def _setup_resources(self):
+        """GPU资源分配"""
+        if self.gpu_id is not None:
+            os.environ["CUDA_VISIBLE_DEVICES"] = str(self.gpu_id)
+            self.logger.info(f"GPU {self.gpu_id} allocated")
+
+
+    def train(self):
+        """执行训练流程"""
+        # 获取程序开始时间
+        start_time = time.time()
+        success = 0
+        print("aaa")
+        farm_id = self.input_file.split('/')[-2]
+        output_file = self.input_file.replace('IN', 'OUT')
+        status_file = 'STATUS.TXT'
+        try:
+            # ------------ 获取数据,预处理训练数据 ------------
+            self.dh.opt.cap = self.capacity
+            train_x, valid_x, train_y, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap = self.dh.train_data_handler(self.train_data)
+            self.ts.opt.Model['input_size'] = train_x.shape[2]
+            # ------------ 训练模型,保存模型 ------------
+            # 1. 如果是加强训练模式,先加载预训练模型特征参数,再预处理训练数据
+            # 2. 如果是普通模式,先预处理训练数据,再根据训练数据特征加载模型
+            print("bbb")
+            model = self.ts.train_init() if self.ts.opt.Model['add_train'] else self.ts.get_keras_model(self.ts.opt)
+            if self.ts.opt.Model['add_train']:
+                if model:
+                    feas = json.loads(self.ts.model_params).get('features', self.dh.opt.features)
+                    if set(feas).issubset(set(self.dh.opt.features)):
+                        self.dh.opt.features = list(feas)
+                        train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap = self.dh.train_data_handler(self.train_data)
+                    else:
+                        model = self.ts.get_keras_model(self.ts.opt)
+                        self.logger.info("训练数据特征,不满足,加强训练模型特征")
+                else:
+                    model = self.ts.get_keras_model(self.ts.opt)
+            print("ccc")
+            # 执行训练
+            trained_model = self.ts.training(model, [train_x, valid_x, train_y, valid_y])
+            # 模型持久化
+            success = 1
+            print('ddd')
+            # 更新算法状态:1. 启动成功
+            write_number_to_file(os.path.join(output_file, status_file), 1, 1, 'rewrite')
+            # ------------ 组装模型数据 ------------
+            self.opt.Model['features'] = ','.join(self.dh.opt.features)
+            self.config.update({
+                'params': json.dumps(self.config['Model']),
+                'descr': f'南网竞赛-{farm_id}',
+                'gen_time': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()),
+                'model_table': self.config['model_table'] + farm_id,
+                'scaler_table': self.config['scaler_table'] + farm_id
+            })
+            self.mgUtils.insert_trained_model_into_mongo(trained_model, self.config)
+            self.mgUtils.insert_scaler_model_into_mongo(scaled_train_bytes, scaled_target_bytes, self.config)
+            # 更新算法状态:正常结束
+            print("eee")
+            write_number_to_file(os.path.join(output_file, status_file), 2, 2)
+            return True
+        except Exception as e:
+            self._handle_error(e)
+            return False
+
+    def _initialize_model(self):
+        """模型初始化策略"""
+        if self.ts.opt.Model['add_train']:
+            pretrained = self.ts.train_init()
+            return pretrained if self._check_feature_compatibility(pretrained) else self.ts.get_keras_model()
+        return self.ts.get_keras_model()
+
+    def _check_feature_compatibility(self, model) -> bool:
+        """检查特征兼容性"""
+        # 原始逻辑中的特征校验实现
+        pass
+
+
+    def _handle_error(self, error: Exception):
+        """统一错误处理"""
+        error_msg = traceback.format_exc()
+        self.logger.error(f"Training failed: {str(error)}\n{error_msg}")
+
+
+
+
+
+# 使用示例
+if __name__ == "__main__":
+    config = {
+        'base_path': '/data/power_forecast',
+        'capacities': {
+            '1001': 2.5,
+            '1002': 3.0,
+            # ... 其他场站配置
+        },
+        'gpu_assignment': [0, 1, 2, 3]  # 可用GPU列表
+    }
+
+    orchestrator = TrainingOrchestrator(
+        station_ids=['1001', '1002', '1003'],  # 实际场景下生成数百个ID
+        config=config,
+        max_workers=4  # 根据GPU数量设置
+    )
+    orchestrator.execute()