Browse Source

合并从家中更新的代码Merge branch 'dev_david' of http://git.jiayuepowertech.com:9022/anweiguo/algorithm_platform into dev_david

David 2 weeks ago
parent
commit
0e4294b5d4

+ 262 - 0
data_processing/data_operation/custom_data_handler.py

@@ -0,0 +1,262 @@
+#!/usr/bin/env python
+# -*- coding:utf-8 -*-
+# @FileName  :custom_data_handler.py
+# @Time      :2025/05/10 16:06
+# @Author    :David
+# @Company: shenyang JY
+import argparse, numbers, joblib
+import numpy as np
+import pandas as pd
+from io import BytesIO
+from bson.decimal128 import Decimal128
+from sklearn.preprocessing import MinMaxScaler
+from common.processing_data_common import missing_features, str_to_list
+from common.data_cleaning import *
+
+class CustomDataHandler(object):
+    def __init__(self, logger, args):
+        self.logger = logger
+        self.opt = argparse.Namespace(**args)
+
+    def get_train_data(self, dfs, col_time, target, time_series=1):
+        train_x, valid_x, train_y, valid_y = [], [], [], []
+        for i, df in enumerate(dfs, start=1):
+            if len(df) < self.opt.Model["time_step"]:
+                self.logger.info("特征处理-训练数据-不满足time_step")
+
+            datax, datay = self.get_timestep_features(df, col_time, target, is_train=True, time_series=time_series)
+            if len(datax) < 10:
+                self.logger.info("特征处理-训练数据-无法进行最小分割")
+                continue
+            tx, vx, ty, vy = self.train_valid_split(datax, datay, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
+            train_x.extend(tx)
+            valid_x.extend(vx)
+            train_y.extend(ty)
+            valid_y.extend(vy)
+
+        train_y = [np.array([y[0].values for y in train_y]), np.concatenate([[y[1].iloc[:, 1].values for y in train_y]])]
+        valid_y = [np.array([y[0].values for y in valid_y]), np.concatenate([[y[1].iloc[:, 1].values for y in valid_y]])]
+
+        train_x = np.array([x.values for x in train_x])
+        valid_x = np.array([x.values for x in valid_x])
+
+        return train_x, valid_x, train_y, valid_y
+
+    def get_predict_data(self, dfs, time_series=1):
+        test_x = []
+        for i, df in enumerate(dfs, start=1):
+            if len(df) < self.opt.Model["time_step"]*time_series:
+                self.logger.info("特征处理-预测数据-不满足time_step")
+                continue
+            datax = self.get_predict_features(df, time_series)
+            test_x.append(datax)
+        test_x = np.concatenate(test_x, axis=0)
+        return test_x
+
+    def get_predict_features(self, norm_data, time_series=1):
+        """
+        均分数据,获取预测数据集
+        """
+        time_step = self.opt.Model["time_step"]
+        feature_data = norm_data.reset_index(drop=True)
+        time_step *= int(time_series)
+        time_step_loc = time_step - 1
+        iters = int(len(feature_data)) // time_step
+        end = int(len(feature_data)) % time_step
+        features_x = np.array([feature_data.loc[i*time_step:i*time_step + time_step_loc, self.opt.features].reset_index(drop=True) for i in range(iters)])
+        if end > 0:
+            df = feature_data.tail(end)
+            df_repeated = pd.concat([df] + [pd.DataFrame([df.iloc[-1]]* (time_step-end))]).reset_index(drop=True)
+            features_x = np.concatenate((features_x, np.expand_dims(df_repeated, 0)), axis=0)
+        return features_x
+
+    def get_timestep_features(self, norm_data, col_time, target, is_train, time_series=1):
+        """
+        步长分割数据,获取时序训练集
+        """
+        time_step = self.opt.Model["time_step"]
+        feature_data = norm_data.reset_index(drop=True)
+        time_step_loc = time_step*time_series - 1
+        train_num = int(len(feature_data))
+        label_features = [col_time, target] if is_train is True else [col_time, target]
+        nwp_cs = self.opt.features
+        nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step*time_series + 1)]  # 数据库字段 'C_T': 'C_WS170'
+        labels = [feature_data.loc[i:i + time_step_loc, label_features].reset_index(drop=True) for i in range(train_num - time_step*time_series + 1)]
+        features_x, features_y = [], []
+        for i, row in enumerate(zip(nwp, labels)):
+            features_x.append(row[0])
+            features_y.append(row[1])
+        return features_x, features_y
+
+    def get_timestep_features_zone(self, norm_data, col_time, target, is_train, time_series):
+        """
+        步长分割数据,分区建模
+        """
+        time_step = self.opt.Model["time_step"]
+        feature_data = norm_data.reset_index(drop=True)
+        time_step_loc = time_step*time_series - 1
+        train_num = int(len(feature_data))
+        label_features_power = [col_time, target] if is_train is True else [col_time, target]
+        label_features_zone = list(self.opt.zone.keys())
+        nwp_cs = self.opt.features
+        nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step*time_series + 1)]
+        labels_power = [feature_data.loc[i:i + time_step_loc, label_features_power].reset_index(drop=True) for i in range(train_num - time_step*time_series + 1)]
+        labels_zone = [feature_data.loc[i:i + time_step_loc, label_features_zone].reset_index(drop=True) for i in range(train_num - time_step*time_series + 1)]
+        features_x, features_y = [], []
+        for i, row in enumerate(zip(nwp, labels_power, labels_zone)):
+            features_x.append(row[0])
+            features_y.append([row[2], row[1]])
+        return features_x, features_y
+
+    def fill_train_data(self, unite, col_time):
+        """
+        补值
+        """
+        unite[col_time] = pd.to_datetime(unite[col_time])
+        unite['time_diff'] = unite[col_time].diff()
+        dt_short = pd.Timedelta(minutes=15)
+        dt_long = pd.Timedelta(minutes=15 * self.opt.Model['how_long_fill'])
+        data_train = self.missing_time_splite(unite, dt_short, dt_long, col_time)
+        miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)]
+        miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0) / (15 * 60) - len(miss_points)
+        self.logger.info("再次测算,需要插值的总点数为:{}".format(miss_number))
+        if miss_number > 0 and self.opt.Model["train_data_fill"]:
+            data_train = self.data_fill(data_train, col_time)
+        return data_train
+
+    def fill_pre_data(self, unite):
+        unite = unite.interpolate(method='linear')  # nwp先进行线性填充
+        unite = unite.ffill().bfill() # 再对超过采样边缘无法填充的点进行二次填充
+        return unite
+
+    def missing_time_splite(self, df, dt_short, dt_long, col_time):
+        df.reset_index(drop=True, inplace=True)
+        n_long, n_short, n_points = 0, 0, 0
+        start_index = 0
+        dfs = []
+        for i in range(1, len(df)):
+            if df['time_diff'][i] >= dt_long:
+                df_long = df.iloc[start_index:i, :-1]
+                dfs.append(df_long)
+                start_index = i
+                n_long += 1
+            if df['time_diff'][i] > dt_short:
+                self.logger.info(f"{df[col_time][i-1]} ~ {df[col_time][i]}")
+                points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1
+                self.logger.info("缺失点数:{}".format(points))
+                if df['time_diff'][i] < dt_long:
+                    n_short += 1
+                    n_points += points
+                    self.logger.info("需要补值的点数:{}".format(points))
+        dfs.append(df.iloc[start_index:, :-1])
+        self.logger.info(f"数据总数:{len(df)}, 时序缺失的间隔:{n_short}, 其中,较长的时间间隔:{n_long}")
+        self.logger.info("需要补值的总点数:{}".format(n_points))
+        return dfs
+
+    def data_fill(self, dfs, col_time, test=False):
+        dfs_fill, inserts = [], 0
+        for i, df in enumerate(dfs):
+            df = rm_duplicated(df, self.logger)
+            df1 = df.set_index(col_time, inplace=False)
+            dff = df1.resample('15T').interpolate(method='linear')  # 采用线性补值,其他补值方法需要进一步对比
+            dff.reset_index(inplace=True)
+            points = len(dff) - len(df1)
+            dfs_fill.append(dff)
+            self.logger.info("{} ~ {} 有 {} 个点, 填补 {} 个点.".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points))
+            inserts += points
+        name = "预测数据" if test is True else "训练集"
+        self.logger.info("{}分成了{}段,实际一共补值{}点".format(name, len(dfs_fill), inserts))
+        return dfs_fill
+
+    def train_valid_split(self, datax, datay, valid_rate, shuffle):
+        shuffle_index = np.random.permutation(len(datax))
+        indexs = shuffle_index.tolist() if shuffle else np.arange(0, len(datax)).tolist()
+        valid_size = int(len(datax) * valid_rate)
+        valid_index = indexs[-valid_size:]
+        train_index = indexs[:-valid_size]
+        tx, vx, ty, vy = [], [], [], []
+        for i, data in enumerate(zip(datax, datay)):
+            if i in train_index:
+                tx.append(data[0])
+                ty.append(data[1])
+            elif i in valid_index:
+                vx.append(data[0])
+                vy.append(data[1])
+        return tx, vx, ty, vy
+
+    def train_data_handler(self, data, time_series=1):
+        """
+        训练数据预处理:
+        清洗+补值+归一化
+        Args:
+            data: 从mongo中加载的数据
+            opt:参数命名空间
+        return:
+            x_train
+            x_valid
+            y_train
+            y_valid
+        """
+        col_time, features, target = self.opt.col_time, self.opt.features, self.opt.target
+        # 清洗限电记录
+        if 'is_limit' in data.columns:
+            data = data[data['is_limit'] == False]
+        # 筛选特征,数值化,排序
+        train_data = data[[col_time] + features + [target]]
+        train_data = train_data.applymap(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x)
+        train_data = train_data.sort_values(by=col_time)
+        # 清洗特征平均缺失率大于20%的天
+        # train_data = missing_features(train_data, features, col_time)
+        # 对清洗完限电的数据进行特征预处理:
+        # 1.空值异常值清洗
+        train_data_cleaned = cleaning(train_data, '训练集', self.logger, features + [target], col_time)
+        self.opt.features = [x for x in train_data_cleaned.columns.tolist() if x not in [target, col_time] and x in features]
+        # 2. 标准化
+        # 创建特征和目标的标准化器
+        train_scaler = MinMaxScaler(feature_range=(0, 1))
+        target_scaler = MinMaxScaler(feature_range=(0, 1))
+        # 标准化特征和目标
+        scaled_train_data = train_scaler.fit_transform(train_data_cleaned[self.opt.features])
+        scaled_target = target_scaler.fit_transform(train_data_cleaned[[target]])
+        scaled_cap = target_scaler.transform(np.array([[float(self.opt.cap)]]))[0,0]
+        train_data_cleaned[self.opt.features] = scaled_train_data
+        train_data_cleaned[[target]] = scaled_target
+        # 3.缺值补值
+        train_datas = self.fill_train_data(train_data_cleaned, col_time)
+        # 保存两个scaler
+        scaled_train_bytes = BytesIO()
+        scaled_target_bytes = BytesIO()
+        joblib.dump(train_scaler, scaled_train_bytes)
+        joblib.dump(target_scaler, scaled_target_bytes)
+        scaled_train_bytes.seek(0)  # Reset pointer to the beginning of the byte stream
+        scaled_target_bytes.seek(0)
+
+        train_x, valid_x, train_y, valid_y = self.get_train_data(train_datas, col_time, target, time_series)
+        return train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap
+
+    def pre_data_handler(self, data, feature_scaler, time_series=1):
+        """
+        预测数据简单处理
+        Args:
+            data: 从mongo中加载的数据
+            opt:参数命名空间
+        return:
+            scaled_features: 反归一化的特征
+        """
+        # 清洗限电记录
+        if 'is_limit' in data.columns:
+            data = data[data['is_limit'] == False]
+        # features, time_steps, col_time, model_name, col_reserve = str_to_list(args['features']), int(
+        #     args['time_steps']), args['col_time'], args['model_name'], str_to_list(args['col_reserve'])
+        col_time, features = self.opt.col_time, self.opt.features
+        data = data.map(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x)
+        data = data.sort_values(by=col_time).reset_index(drop=True, inplace=False)
+        if not set(features).issubset(set(data.columns.tolist())):
+            raise ValueError("预测数据特征不满足模型特征!")
+        pre_data = data[features].copy()
+        if self.opt.Model['predict_data_fill']:
+            pre_data = self.fill_pre_data(pre_data)
+        pre_data.loc[:, features] = feature_scaler.transform(pre_data)
+
+        pre_x = self.get_predict_data([pre_data], time_series)
+        return pre_x, data

+ 94 - 0
models_processing/model_tf/tf_lstm_zone.py

@@ -0,0 +1,94 @@
+#!/usr/bin/env python
+# -*- coding:utf-8 -*-
+# @FileName  :tf_lstm.py
+# @Time      :2025/2/12 14:03
+# @Author    :David
+# @Company: shenyang JY
+
+from tensorflow.keras.layers import Input, Dense, LSTM, concatenate, Conv1D, Conv2D, MaxPooling1D, Reshape, Flatten
+from tensorflow.keras.models import Model, load_model
+from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, TensorBoard, ReduceLROnPlateau
+from tensorflow.keras import optimizers, regularizers
+from models_processing.model_tf.losses import region_loss
+import numpy as np
+from common.database_dml_koi import *
+from models_processing.model_tf.settings import set_deterministic
+from threading import Lock
+import argparse
+model_lock = Lock()
+set_deterministic(42)
+
+class TSHandler(object):
+    def __init__(self, logger, args):
+        self.logger = logger
+        self.opt = argparse.Namespace(**args)
+        self.model = None
+        self.model_params = None
+
+    def get_model(self, args):
+        """
+        单例模式+线程锁,防止在异步加载时引发线程安全
+        """
+        try:
+            with model_lock:
+                loss = region_loss(self.opt)
+                self.model, self.model_params = get_keras_model_from_mongo(args, {type(loss).__name__: loss})
+        except Exception as e:
+            self.logger.info("加载模型权重失败:{}".format(e.args))
+
+    @staticmethod
+    def get_keras_model(opt, time_series=1, lstm_type=1):
+        loss = region_loss(opt)
+        l1_reg = regularizers.l1(opt.Model['lambda_value_1'])
+        l2_reg = regularizers.l2(opt.Model['lambda_value_2'])
+        nwp_input = Input(shape=(opt.Model['time_step']*time_series, opt.Model['input_size']), name='nwp')
+
+        con1 = Conv1D(filters=64, kernel_size=5, strides=1, padding='valid', activation='relu', kernel_regularizer=l2_reg)(nwp_input)
+        con1_p = MaxPooling1D(pool_size=5, strides=1, padding='valid', data_format='channels_last')(con1)
+        nwp_lstm = LSTM(units=opt.Model['hidden_size'], return_sequences=False, kernel_regularizer=l2_reg)(con1_p)
+        zone = Dense(len(opt.zone.keys()), name='zone')(nwp_lstm)
+        if lstm_type == 2:
+            output = Dense(opt.Model['time_step'], name='cdq_output')(zone)
+        else:
+            output = Dense(opt.Model['time_step']*time_series, name='cdq_output')(zone)
+
+        model = Model(nwp_input, [zone, output])
+        adam = optimizers.Adam(learning_rate=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-7, amsgrad=True)
+        model.compile(loss={"zone": loss, "cdq_output": loss}, loss_weights={"zone": 0.7, "cdq_output": 0.3}, optimizer=adam)
+
+        return model
+
+    def train_init(self):
+        try:
+            # 进行加强训练,支持修模
+            loss = region_loss(self.opt)
+            base_train_model, self.model_params = get_keras_model_from_mongo(vars(self.opt), {type(loss).__name__: loss})
+            base_train_model.summary()
+            self.logger.info("已加载加强训练基础模型")
+            return base_train_model
+        except Exception as e:
+            self.logger.info("加载加强训练模型权重失败:{}".format(e.args))
+            return False
+
+    def training(self, model, train_and_valid_data):
+        model.summary()
+        train_x, train_y, valid_x, valid_y = train_and_valid_data
+        early_stop = EarlyStopping(monitor='val_loss', patience=self.opt.Model['patience'], mode='auto')
+        history = model.fit(train_x, train_y, batch_size=self.opt.Model['batch_size'], epochs=self.opt.Model['epoch'],
+                            verbose=2, validation_data=(valid_x, valid_y), callbacks=[early_stop], shuffle=False)
+        loss = np.round(history.history['loss'], decimals=5)
+        val_loss = np.round(history.history['val_loss'], decimals=5)
+        self.logger.info("-----模型训练经过{}轮迭代-----".format(len(loss)))
+        self.logger.info("训练集损失函数为:{}".format(loss))
+        self.logger.info("验证集损失函数为:{}".format(val_loss))
+        return model
+
+    def predict(self, test_x, batch_size=1):
+        result = self.model.predict(test_x, batch_size=batch_size)
+        self.logger.info("执行预测方法")
+        return result
+
+
+
+if __name__ == "__main__":
+    run_code = 0

+ 142 - 0
models_processing/model_tf/tf_lstm_zone_pre.py

@@ -0,0 +1,142 @@
+#!/usr/bin/env python
+# -*- coding:utf-8 -*-
+# @FileName  :tf_lstm_pre.py
+# @Time      :2025/2/13 10:52
+# @Author    :David
+# @Company: shenyang JY
+import json, copy
+import numpy as np
+from flask import Flask, request, g
+import logging, argparse, traceback
+from common.database_dml_koi import *
+from common.processing_data_common import missing_features, str_to_list
+from data_processing.data_operation.custom_data_handler import CustomDataHandler
+from models_processing.model_tf.tf_lstm_zone import TSHandler
+from threading import Lock
+import time, yaml
+from copy import deepcopy
+model_lock = Lock()
+from itertools import chain
+from common.logs import Log
+# logger = Log('tf_bp').logger()
+logger = Log('tf_ts').logger
+np.random.seed(42)  # NumPy随机种子
+# tf.set_random_seed(42)  # TensorFlow随机种子
+app = Flask('tf_lstm_zone_pre——service')
+
+current_dir = os.path.dirname(os.path.abspath(__file__))
+with open(os.path.join(current_dir, 'lstm.yaml'), 'r', encoding='utf-8') as f:
+    global_config = yaml.safe_load(f)  # 只读的全局配置
+
+@app.before_request
+def update_config():
+    # ------------ 整理参数,整合请求参数 ------------
+    # 深拷贝全局配置 + 合并请求参数
+    current_config = deepcopy(global_config)
+    request_args = request.values.to_dict()
+    # features参数规则:1.有传入,解析,覆盖 2. 无传入,不覆盖,原始值
+    request_args['features'] = request_args['features'].split(',') if 'features' in request_args else current_config['features']
+    request_args['time_series'] = request_args.get('time_series', 1)
+    current_config.update(request_args)
+
+    # 存储到请求上下文
+    g.opt = argparse.Namespace(**current_config)
+    g.dh = CustomDataHandler(logger, current_config)  # 每个请求独立实例
+    g.ts = TSHandler(logger, current_config)
+
+@app.route('/tf_lstm_zone_predict', methods=['POST'])
+def model_prediction_lstm():
+    # 获取程序开始时间
+    start_time = time.time()
+    result = {}
+    success = 0
+    dh = g.dh
+    ts = g.ts
+    args = deepcopy(g.opt.__dict__)
+    logger.info("Program starts execution!")
+    try:
+        pre_data = get_data_from_mongo(args)
+        if args.get('algorithm_test', 0):
+            field_mapping = {'clearsky_ghi': 'clearskyGhi', 'dni_calcd': 'dniCalcd','surface_pressure': 'surfacePressure'}
+            pre_data = pre_data.rename(columns=field_mapping)
+        feature_scaler, target_scaler = get_scaler_model_from_mongo(args)
+        ts.opt.cap = round(target_scaler.transform(np.array([[float(args['cap'])]]))[0, 0], 2)
+        ts.get_model(args)
+        dh.opt.features = json.loads(ts.model_params)['Model']['features'].split(',')
+        scaled_pre_x, pre_data = dh.pre_data_handler(pre_data, feature_scaler, time_series=args['time_series'])
+        res = list(chain.from_iterable(target_scaler.inverse_transform(ts.predict(scaled_pre_x))))
+        pre_data['farm_id'] = args.get('farm_id', 'null')
+        if int(args.get('algorithm_test', 0)):
+            pre_data[args['model_name']] = res[:len(pre_data)]
+            pre_data.rename(columns={args['col_time']: 'dateTime'}, inplace=True)
+            pre_data = pre_data[['dateTime', 'farm_id', args['target'], args['model_name'], 'dq']]
+            pre_data = pre_data.melt(id_vars=['dateTime', 'farm_id', args['target']], var_name='model', value_name='power_forecast')
+            res_cols = ['dateTime', 'power_forecast', 'farm_id', args['target'], 'model']
+            if 'howLongAgo' in args:
+                pre_data['howLongAgo'] = int(args['howLongAgo'])
+                res_cols += ['howLongAgo']
+        else:
+            pre_data['power_forecast'] = res[:len(pre_data)]
+            pre_data.rename(columns={args['col_time']: 'date_time'}, inplace=True)
+            res_cols = ['date_time', 'power_forecast', 'farm_id']
+        pre_data = pre_data[res_cols]
+
+        pre_data.loc[:, 'power_forecast'] = pre_data['power_forecast'].round(2)
+        pre_data.loc[pre_data['power_forecast'] > float(args['cap']), 'power_forecast'] = float(args['cap'])
+        pre_data.loc[pre_data['power_forecast'] < 0, 'power_forecast'] = 0
+
+        insert_data_into_mongo(pre_data, args)
+        success = 1
+    except Exception as e:
+        my_exception = traceback.format_exc()
+        my_exception.replace("\n", "\t")
+        result['msg'] = my_exception
+    end_time = time.time()
+
+    result['success'] = success
+    result['args'] = args
+    result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
+    result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
+    print("Program execution ends!")
+    return result
+
+
+if __name__ == "__main__":
+    print("Program starts execution!")
+    from waitress import serve
+    serve(app, host="0.0.0.0", port=10125,
+          threads=8,  # 指定线程数(默认4,根据硬件调整)
+          channel_timeout=600  # 连接超时时间(秒)
+          )
+    print("server start!")
+
+    # ------------------------测试代码------------------------
+    # args_dict = {"mongodb_database": 'david_test', 'scaler_table': 'j00083_scaler', 'model_name': 'bp1.0.test',
+    #              'model_table': 'j00083_model', 'mongodb_read_table': 'j00083_test', 'col_time': 'date_time', 'mongodb_write_table': 'j00083_rs',
+    #              'features': 'speed10,direction10,speed30,direction30,speed50,direction50,speed70,direction70,speed90,direction90,speed110,direction110,speed150,direction150,speed170,direction170'}
+    # args_dict['features'] = args_dict['features'].split(',')
+    # arguments.update(args_dict)
+    # dh = DataHandler(logger, arguments)
+    # ts = TSHandler(logger)
+    # opt = argparse.Namespace(**arguments)
+    #
+    # opt.Model['input_size'] = len(opt.features)
+    # pre_data = get_data_from_mongo(args_dict)
+    # feature_scaler, target_scaler = get_scaler_model_from_mongo(arguments)
+    # pre_x = dh.pre_data_handler(pre_data, feature_scaler, opt)
+    # ts.get_model(arguments)
+    # result = ts.predict(pre_x)
+    # result1 = list(chain.from_iterable(target_scaler.inverse_transform([result.flatten()])))
+    # pre_data['power_forecast'] = result1[:len(pre_data)]
+    # pre_data['farm_id'] = 'J00083'
+    # pre_data['cdq'] = 1
+    # pre_data['dq'] = 1
+    # pre_data['zq'] = 1
+    # pre_data.rename(columns={arguments['col_time']: 'date_time'}, inplace=True)
+    # pre_data = pre_data[['date_time', 'power_forecast', 'farm_id', 'cdq', 'dq', 'zq']]
+    #
+    # pre_data['power_forecast'] = pre_data['power_forecast'].round(2)
+    # pre_data.loc[pre_data['power_forecast'] > opt.cap, 'power_forecast'] = opt.cap
+    # pre_data.loc[pre_data['power_forecast'] < 0, 'power_forecast'] = 0
+    #
+    # insert_data_into_mongo(pre_data, arguments)

+ 121 - 0
models_processing/model_tf/tf_lstm_zone_train.py

@@ -0,0 +1,121 @@
+#!/usr/bin/env python
+# -*- coding:utf-8 -*-
+# @FileName  :tf_lstm_train.py
+# @Time      :2025/2/13 10:52
+# @Author    :David
+# @Company: shenyang JY
+import json, copy
+import numpy as np
+from flask import Flask, request, jsonify, g
+import traceback, uuid
+import logging, argparse
+from data_processing.data_operation.custom_data_handler import CustomDataHandler
+import time, yaml, threading
+from copy import deepcopy
+from models_processing.model_tf.tf_lstm_zone import TSHandler
+from common.database_dml_koi import *
+from common.logs import Log
+logger = Log('tf_ts').logger
+np.random.seed(42)  # NumPy随机种子
+app = Flask('tf_lstm_zone_train——service')
+
+current_dir = os.path.dirname(os.path.abspath(__file__))
+with open(os.path.join(current_dir, 'lstm.yaml'), 'r', encoding='utf-8') as f:
+    global_config = yaml.safe_load(f)  # 只读的全局配置
+
+@app.before_request
+def update_config():
+    # ------------ 整理参数,整合请求参数 ------------
+    # 深拷贝全局配置 + 合并请求参数
+    current_config = deepcopy(global_config)
+    request_args = request.values.to_dict()
+    # features参数规则:1.有传入,解析,覆盖 2. 无传入,不覆盖,原始值
+    request_args['features'] = request_args['features'].split(',') if 'features' in request_args else current_config['features']
+    request_args['time_series'] = request_args.get('time_series', 1)
+    current_config.update(request_args)
+
+    # 存储到请求上下文
+    g.opt = argparse.Namespace(**current_config)
+    g.dh = CustomDataHandler(logger, current_config)  # 每个请求独立实例
+    g.ts = TSHandler(logger, current_config)
+
+
+@app.route('/tf_lstm_zone_training', methods=['POST'])
+def model_training_lstm():
+    # 获取程序开始时间
+    start_time = time.time()
+    result = {}
+    success = 0
+    dh = g.dh
+    ts = g.ts
+    args = deepcopy(g.opt.__dict__)
+    logger.info("Program starts execution!")
+    try:
+        # ------------ 获取数据,预处理训练数据 ------------
+        train_data = get_data_from_mongo(args)
+        train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap = dh.train_data_handler(train_data, time_series=args['time_series'])
+        ts.opt.cap = round(scaled_cap, 2)
+        ts.opt.Model['input_size'] = len(dh.opt.features)
+        # ------------ 训练模型,保存模型 ------------
+        # 1. 如果是加强训练模式,先加载预训练模型特征参数,再预处理训练数据
+        # 2. 如果是普通模式,先预处理训练数据,再根据训练数据特征加载模型
+        model = ts.train_init() if ts.opt.Model['add_train'] else ts.get_keras_model(ts.opt, time_series=args['time_series'], lstm_type=1)
+        if ts.opt.Model['add_train']:
+            if model:
+                feas = json.loads(ts.model_params)['features']
+                if set(feas).issubset(set(dh.opt.features)):
+                    dh.opt.features = list(feas)
+                    train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap = dh.train_data_handler(train_data, time_series=args['time_series'])
+                else:
+                    model = ts.get_keras_model(ts.opt, time_series=args['time_series'], lstm_type=1)
+                    logger.info("训练数据特征,不满足,加强训练模型特征")
+            else:
+                model = ts.get_keras_model(ts.opt, time_series=args['time_series'], lstm_type=1)
+        ts_model = ts.training(model, [train_x, train_y, valid_x, valid_y])
+        args['Model']['features'] = ','.join(dh.opt.features)
+        args['params'] = json.dumps(args)
+        args['descr'] = '测试'
+        args['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
+
+        insert_trained_model_into_mongo(ts_model, args)
+        insert_scaler_model_into_mongo(scaled_train_bytes, scaled_target_bytes, args)
+        success = 1
+    except Exception as e:
+        my_exception = traceback.format_exc()
+        my_exception.replace("\n", "\t")
+        result['msg'] = my_exception
+    end_time = time.time()
+    result['success'] = success
+    result['args'] = args
+    result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
+    result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
+    print("Program execution ends!")
+    return result
+
+
+if __name__ == "__main__":
+    print("Program starts execution!")
+    from waitress import serve
+    serve(app, host="0.0.0.0", port=10124,
+          threads=8,  # 指定线程数(默认4,根据硬件调整)
+          channel_timeout=600  # 连接超时时间(秒)
+          )
+    print("server start!")
+    # args_dict = {"mongodb_database": 'realtimeDq', 'scaler_table': 'j00600_scaler', 'model_name': 'lstm1',
+    # 'model_table': 'j00600_model', 'mongodb_read_table': 'j00600', 'col_time': 'dateTime',
+    # 'features': 'speed10,direction10,speed30,direction30,speed50,direction50,speed70,direction70,speed90,direction90,speed110,direction110,speed150,direction150,speed170,direction170'}
+    # args_dict['features'] = args_dict['features'].split(',')
+    # args.update(args_dict)
+    # dh = DataHandler(logger, args)
+    # ts = TSHandler(logger, args)
+    # opt = argparse.Namespace(**args)
+    # opt.Model['input_size'] = len(opt.features)
+    # train_data = get_data_from_mongo(args_dict)
+    # train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes = dh.train_data_handler(train_data)
+    # ts_model = ts.training([train_x, train_y, valid_x, valid_y])
+    #
+    # args_dict['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
+    # args_dict['params'] = args
+    # args_dict['descr'] = '测试'
+    # insert_trained_model_into_mongo(ts_model, args_dict)
+    # insert_scaler_model_into_mongo(scaled_train_bytes, scaled_target_bytes, args_dict)