David 3 mēneši atpakaļ
vecāks
revīzija
e985e4ce77

+ 121 - 0
data_processing/data_operation/data_handler.py

@@ -0,0 +1,121 @@
+#!/usr/bin/env python
+# -*- coding:utf-8 -*-
+# @FileName  :data_handler.py
+# @Time      :2025/1/8 14:56
+# @Author    :David
+# @Company: shenyang JY
+import numpy as np
+import pandas as pd
+from common.data_cleaning import *
+
+class DataHandler(object):
+    def __init__(self, logger, args):
+        self.logger = logger
+        self.opt = args.parse_args_and_yaml()
+
+    def get_train_data(self, df):
+        train_x, valid_x, train_y, valid_y = [], [], [], []
+        if len(df) < self.opt.Model["time_step"]:
+            self.logger.info("特征处理-训练数据-不满足time_step")
+        datax, datay = self.get_timestep_features(df, is_train=True)
+        if len(datax) < 10:
+            self.logger.info("特征处理-训练数据-无法进行最小分割")
+        tx, vx, ty, vy = self.train_valid_split(datax, datay, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
+        train_x.extend(tx)
+        valid_x.extend(vx)
+        train_y.extend(ty)
+        valid_y.extend(vy)
+
+        train_y = np.concatenate([[y.iloc[:, 1].values for y in train_y]], axis=0)
+        valid_y = np.concatenate([[y.iloc[:, 1].values for y in valid_y]], axis=0)
+
+        train_x = [np.array([x[0].values for x in train_x]), np.array([x[1].values for x in train_x])]
+        valid_x = [np.array([x[0].values for x in valid_x]), np.array([x[1].values for x in valid_x])]
+
+        return train_x, valid_x, train_y, valid_y
+
+    def get_timestep_features(self, norm_data, is_train):   # 这段代码基于pandas方法的优化
+        time_step = self.opt.Model["time_step"]
+        feature_data = norm_data.reset_index(drop=True)
+        time_step_loc = time_step - 1
+        train_num = int(len(feature_data))
+        label_features = ['C_TIME', 'C_REAL_VALUE'] if is_train is True else ['C_TIME', 'C_REAL_VALUE']
+        nwp_cs = self.opt.nwp_columns.copy()
+        if 'C_TIME' in nwp_cs:
+            nwp_cs.pop(nwp_cs.index('C_TIME'))
+        nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step + 1)]  # 数据库字段 'C_T': 'C_WS170'
+        labels = [feature_data.loc[i:i + time_step_loc, label_features].reset_index(drop=True) for i in range(train_num - time_step + 1)]
+        features_x, features_y = [], []
+        self.logger.info("匹配环境前,{}组 -> ".format(len(nwp)))
+        for i, row in enumerate(zip(nwp, labels)):
+            features_x.append(row[0])
+            features_y.append(row[1])
+        self.logger.info("匹配环境后,{}组".format(len(features_x)))
+        return features_x, features_y
+
+    def fill_train_data(self, unite):
+        unite['C_TIME'] = pd.to_datetime(unite['C_TIME'])
+        unite['time_diff'] = unite['C_TIME'].diff()
+        dt_short = pd.Timedelta(minutes=15)
+        dt_long = pd.Timedelta(minutes=15 * self.opt.Model['how_long_fill'])
+        data_train = self.missing_time_splite(unite, dt_short, dt_long)
+        miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)]
+        miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0) / (15 * 60) - len(miss_points)
+        self.logger.info("再次测算,需要插值的总点数为:{}".format(miss_number))
+        if miss_number > 0 and self.opt.Model["train_data_fill"]:
+            data_train = self.data_fill(data_train)
+        return data_train
+
+    def missing_time_splite(self, df, dt_short, dt_long):
+        n_long, n_short, n_points = 0, 0, 0
+        start_index = 0
+        dfs = []
+        for i in range(1, len(df)):
+            if df['time_diff'][i] >= dt_long:
+                df_long = df.iloc[start_index:i, :-1]
+                dfs.append(df_long)
+                start_index = i
+                n_long += 1
+            if df['time_diff'][i] > dt_short:
+                self.logger.info(f"{df['C_TIME'][i-1]} ~ {df['C_TIME'][i]}")
+                points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1
+                self.logger.info("缺失点数:{}".format(points))
+                if df['time_diff'][i] < dt_long:
+                    n_short += 1
+                    n_points += points
+                    self.logger.info("需要补值的点数:{}".format(points))
+        dfs.append(df.iloc[start_index:, :-1])
+        self.logger.info(f"数据总数:{len(df)}, 时序缺失的间隔:{n_short}, 其中,较长的时间间隔:{n_long}")
+        self.logger.info("需要补值的总点数:{}".format(n_points))
+        return dfs
+
+    def data_fill(self, dfs, test=False):
+        dfs_fill, inserts = [], 0
+        for i, df in enumerate(dfs):
+            df = rm_duplicated(df, self.logger)
+            df1 = df.set_index('C_TIME', inplace=False)
+            dff = df1.resample('15T').interpolate(method='linear')  # 采用线性补值,其他补值方法需要进一步对比
+            dff.reset_index(inplace=True)
+            points = len(dff) - len(df1)
+            dfs_fill.append(dff)
+            self.logger.info("{} ~ {} 有 {} 个点, 填补 {} 个点.".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points))
+            inserts += points
+        name = "预测数据" if test is True else "训练集"
+        self.logger.info("{}分成了{}段,实际一共补值{}点".format(name, len(dfs_fill), inserts))
+        return dfs_fill
+
+    def train_valid_split(self, datax, datay, valid_rate, shuffle):
+        shuffle_index = np.random.permutation(len(datax))
+        indexs = shuffle_index.tolist() if shuffle else np.arange(0, len(datax)).tolist()
+        valid_size = int(len(datax) * valid_rate)
+        valid_index = indexs[-valid_size:]
+        train_index = indexs[:-valid_size]
+        tx, vx, ty, vy = [], [], [], []
+        for i, data in enumerate(zip(datax, datay)):
+            if i in train_index:
+                tx.append(data[0])
+                ty.append(data[1])
+            elif i in valid_index:
+                vx.append(data[0])
+                vy.append(data[1])
+        return tx, vx, ty, vy

+ 275 - 0
models_processing/model_koi/nn_bp.py

@@ -0,0 +1,275 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# time: 2024/5/6 13:25
+# file: time_series.py
+# author: David
+# company: shenyang JY
+
+import numpy as np
+from sklearn.model_selection import train_test_split
+from flask import Flask, request
+import time
+import traceback
+import logging
+from sklearn.preprocessing import MinMaxScaler
+from io import BytesIO
+import joblib
+from tensorflow.keras.layers import Input, Dense, LSTM, concatenate, Conv1D, Conv2D, MaxPooling1D, Reshape, Flatten
+from tensorflow.keras.models import Model, load_model
+from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, TensorBoard, ReduceLROnPlateau
+from tensorflow.keras import optimizers, regularizers
+import tensorflow.keras.backend as K
+import tensorflow as tf
+
+from common.data_cleaning import cleaning
+from common.database_dml import *
+from common.processing_data_common import missing_features, str_to_list
+from data_processing.data_operation.data_handler import DataHandler
+from threading import Lock
+import time
+import random
+import matplotlib.pyplot as plt
+model_lock = Lock()
+
+
+app = Flask('model_training_bp——service')
+
+def draw_loss(history):
+    # 绘制训练集和验证集损失
+    plt.figure(figsize=(20, 8))
+    plt.plot(history.history['loss'], label='Training Loss')
+    plt.plot(history.history['val_loss'], label='Validation Loss')
+    plt.title('Loss Curve')
+    plt.xlabel('Epochs')
+    plt.ylabel('Loss')
+    plt.legend()
+    plt.show()
+
+dh = DataHandler()
+def train_data_handler(data, args):
+    sleep_time = random.uniform(1, 20)  # 生成 5 到 20 之间的随机浮动秒数
+    time.sleep(sleep_time)
+    tf.keras.backend.clear_session()  # 清除当前的图和会话
+    # 设置随机种子
+    np.random.seed(42)  # NumPy随机种子
+    tf.random.set_seed(42)  # TensorFlow随机种子
+    col_time, features, target = args['col_time'], str_to_list(args['features']), args['target']
+    if 'is_limit' in data.columns:
+        data = data[data['is_limit'] == False]
+    # 清洗特征平均缺失率大于20%的天
+    train_data = data.sort_values(by=col_time)
+    # 对清洗完限电的数据进行特征预处理:1.空值异常值清洗 2.缺值补值
+    train_data_cleaned = cleaning(train_data, '', logger, train_data.columns.tolist())
+    train_data = dh.fill_train_data(train_data_cleaned)
+    # 创建特征和目标的标准化器
+    train_scaler = MinMaxScaler(feature_range=(0, 1))
+    # 标准化特征和目标
+    scaled_train_data = train_scaler.fit_transform(train_data)
+    # 保存两个scaler
+    scaled_train_bytes = BytesIO()
+    joblib.dump(scaled_train_data, scaled_train_bytes)
+    scaled_train_bytes.seek(0)  # Reset pointer to the beginning of the byte stream
+    x_train, x_valid, y_train, y_valid = dh.get_train_data(scaled_train_data)
+    return x_train, x_valid, y_train, y_valid, scaled_train_bytes
+
+def pre_data_handler(data, args):
+    if 'is_limit' in data.columns:
+        data = data[data['is_limit'] == False]
+    features, time_steps, col_time, model_name,col_reserve =  str_to_list(args['features']), int(args['time_steps']),args['col_time'],args['model_name'],str_to_list(args['col_reserve'])
+    feature_scaler,target_scaler = get_scaler_model_from_mongo(args)
+    pre_data = data.sort_values(by=col_time)
+    # 对预测数据进行特征预处理:1.空值异常值清洗 2.缺值补值
+    pre_data_cleaned = cleaning(pre_data, '', logger, pre_data.columns.tolist())
+    pre_data = dh.fill_train_data(pre_data_cleaned)
+    scaled_features = feature_scaler.transform(pre_data[features])
+    return scaled_features
+
+class NPHandler(object):
+    train = False
+
+    def __init__(self, log, args, graph, sess):
+        self.logger = log
+        self.graph = graph
+        self.sess = sess
+        opt = args.parse_args_and_yaml()
+        self.model = None
+
+    def get_model(self, args):
+        """
+        单例模式+线程锁,防止在异步加载时引发线程安全
+        """
+        try:
+            with model_lock:
+                # NPHandler.model = NPHandler.get_keras_model(opt)
+                self.model = get_h5_model_from_mongo(args)
+        except Exception as e:
+            print("加载模型权重失败:{}".format(e.args))
+
+    @staticmethod
+    def get_keras_model(opt):
+        # db_loss = NorthEastLoss(opt)
+        # south_loss = SouthLoss(opt)
+        l1_reg = regularizers.l1(opt.Model['lambda_value_1'])
+        l2_reg = regularizers.l2(opt.Model['lambda_value_2'])
+        nwp_input = Input(shape=(opt.Model['time_step'], opt.Model['input_size_nwp']), name='nwp')
+        env_input = Input(shape=(opt.Model['his_points'], opt.Model['input_size_env']), name='env')
+
+        con1 = Conv1D(filters=64, kernel_size=1, strides=1, padding='valid', activation='relu',
+                      kernel_regularizer=l2_reg)(nwp_input)
+        d1 = Dense(32, activation='relu', name='d1', kernel_regularizer=l1_reg)(con1)
+        nwp = Dense(8, activation='relu', name='d2', kernel_regularizer=l1_reg)(d1)
+
+        output = Dense(opt.Model['output_size'], name='d5')(nwp)
+        model = Model([env_input, nwp_input], output)
+        adam = optimizers.Adam(learning_rate=opt.Model['learning_rate'], beta_1=0.9, beta_2=0.999, epsilon=1e-7,
+                               amsgrad=True)
+        reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.01, patience=5, verbose=1)
+        model.compile(loss=rmse, optimizer=adam)
+        return model
+
+    def train_init(self, opt, args):
+        try:
+            if opt.Model['add_train']:
+                # 进行加强训练,支持修模
+                base_train_model = get_h5_model_from_mongo(args)
+                base_train_model.summary()
+                self.logger.info("已加载加强训练基础模型")
+            else:
+                base_train_model = self.get_keras_model(opt)
+            return base_train_model
+        except Exception as e:
+            self.logger.info("加强训练加载模型权重失败:{}".format(e.args))
+
+    def training(self, opt, train_and_valid_data):
+        model = self.train_init(opt)
+        train_X, train_Y, valid_X, valid_Y = train_and_valid_data
+        print("----------", np.array(train_X[0]).shape)
+        print("++++++++++", np.array(train_X[1]).shape)
+        # weight_lstm_1, bias_lstm_1 = model.get_layer('d1').get_weights()
+        # print("weight_lstm_1 = ", weight_lstm_1)
+        # print("bias_lstm_1 = ", bias_lstm_1)
+
+        check_point = ModelCheckpoint(filepath='./var/' + 'fmi.h5', monitor='val_loss',
+                                      save_best_only=True, mode='auto')
+        early_stop = EarlyStopping(monitor='val_loss', patience=opt.Model['patience'], mode='auto')
+        # tbCallBack = TensorBoard(log_dir='../figure',
+        #                          histogram_freq=0,
+        #                          write_graph=True,
+        #                          write_images=True)
+        history = model.fit(train_X, train_Y, batch_size=opt.Model['batch_size'], epochs=opt.Model['epoch'], verbose=2,
+                            validation_data=(valid_X, valid_Y), callbacks=[check_point, early_stop], shuffle=False)
+        loss = np.round(history.history['loss'], decimals=5)
+        val_loss = np.round(history.history['val_loss'], decimals=5)
+        self.logger.info("-----模型训练经过{}轮迭代-----".format(len(loss)))
+        self.logger.info("训练集损失函数为:{}".format(loss))
+        self.logger.info("验证集损失函数为:{}".format(val_loss))
+        return model
+
+    def predict(self, test_X, batch_size=1):
+        result = self.model.predict(test_X, batch_size=batch_size)
+        self.logger.info("执行预测方法")
+        return result
+
+def build_model(data, args):
+    # 划分训练集和测试集
+    X_train, X_test, y_train, y_test = train_test_split(scaled_features, scaled_target, test_size=0.2, random_state=43)
+
+    # 构建 LSTM 模型
+    model = Sequential([
+        Dense(64, input_dim=X_train.shape[1], activation='relu'),  # 输入层和隐藏层,10个神经元
+        Dropout(0.2),
+        Dense(32, activation='relu'),  # 隐藏层,8个神经元
+        Dropout(0.3),  # Dropout层,30%的神经元输出会被随机丢弃
+        Dense(1, activation='linear')  # 输出层,1个神经元(用于回归任务)
+    ])
+
+    # 编译模型
+    model.compile(optimizer='adam', loss='mean_squared_error')
+    # 定义 EarlyStopping 和 ReduceLROnPlateau 回调
+    early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True, verbose=1)
+    reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, verbose=1)
+    # 训练模型
+    # 使用GPU进行训练
+    with tf.device('/GPU:1'):
+        history = model.fit(X_train, y_train,
+                            epochs=100,
+                            batch_size=32,
+                            validation_data=(X_test, y_test),
+                            verbose=2,
+                            shuffle=False,
+                            callbacks=[early_stopping, reduce_lr])
+    draw_loss(history)
+    return model, feature_scaler_bytes, target_scaler_bytes
+
+
+@app.route('/model_training_bp', methods=['POST'])
+def model_training_bp():
+    # 获取程序开始时间
+    start_time = time.time()
+    result = {}
+    success = 0
+    nh = NPHandler()
+    print("Program starts execution!")
+    try:
+        args = request.values.to_dict()
+        print('args', args)
+        logger.info(args)
+        power_df = get_data_from_mongo(args)
+        train_x, valid_x, train_y, valid_y, train_data_handler = dh.get_train_data(power_df)
+        np_model = nh.training(opt, [train_x, valid_x, train_y, valid_y])
+        model, feature_scaler_bytes, target_scaler_bytes = build_model(power_df, args)
+
+        insert_h5_model_into_mongo(np_model, train_data_handler, args)
+        success = 1
+    except Exception as e:
+        my_exception = traceback.format_exc()
+        my_exception.replace("\n", "\t")
+        result['msg'] = my_exception
+    end_time = time.time()
+
+    result['success'] = success
+    result['args'] = args
+    result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
+    result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
+    print("Program execution ends!")
+    return result
+
+
+@app.route('/model_prediction_bp', methods=['POST'])
+def model_prediction_bp():
+    # 获取程序开始时间
+    start_time = time.time()
+    result = {}
+    success = 0
+    nh = NPHandler()
+    print("Program starts execution!")
+    try:
+        args = request.values.to_dict()
+        print('args', args)
+        logger.info(args)
+        power_df = get_data_from_mongo(args)
+        scaled_features = pre_data_handler(power_df, args)
+        result = nh.predict(power_df, args)
+        insert_data_into_mongo(result, args)
+        success = 1
+    except Exception as e:
+        my_exception = traceback.format_exc()
+        my_exception.replace("\n", "\t")
+        result['msg'] = my_exception
+    end_time = time.time()
+
+    result['success'] = success
+    result['args'] = args
+    result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
+    result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
+    print("Program execution ends!")
+    return result
+
+if __name__ == "__main__":
+    print("Program starts execution!")
+    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+    logger = logging.getLogger("model_training_bp log")
+    from waitress import serve
+
+    serve(app, host="0.0.0.0", port=10103, threads=4)
+    print("server start!")