#!/usr/bin/env python # -*- coding: utf-8 -*- # time: 2024/5/6 13:25 # file: time_series.py # author: David # company: shenyang JY import json, copy import numpy as np from flask import Flask, request import time import traceback import logging, argparse from sklearn.preprocessing import MinMaxScaler from io import BytesIO import joblib from tensorflow.keras.layers import Input, Dense, LSTM, concatenate, Conv1D, Conv2D, MaxPooling1D, Reshape, Flatten from tensorflow.keras.models import Model, load_model from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, TensorBoard, ReduceLROnPlateau from tensorflow.keras import optimizers, regularizers import tensorflow.keras.backend as K import tensorflow as tf from common.data_cleaning import cleaning from common.database_dml import * from common.processing_data_common import missing_features, str_to_list from data_processing.data_operation.data_handler import DataHandler from threading import Lock import time, yaml import random import matplotlib.pyplot as plt model_lock = Lock() from common.logs import Log logger = logging.getLogger() # logger = Log('models-processing').logger np.random.seed(42) # NumPy随机种子 tf.random.set_random_seed(42) # TensorFlow随机种子 app = Flask('nn_bp——service') with app.app_context(): with open('../model_koi/bp.yaml', 'r', encoding='utf-8') as f: arguments = yaml.safe_load(f) dh = DataHandler(logger, arguments) def train_data_handler(data, opt): col_time, features, target = opt.col_time, opt.features, opt.target if 'is_limit' in data.columns: data = data[data['is_limit'] == False] # 清洗特征平均缺失率大于20%的天 data = missing_features(data, features, col_time) train_data = data.sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill') train_data = train_data.sort_values(by=col_time) # 对清洗完限电的数据进行特征预处理:1.空值异常值清洗 2.缺值补值 train_data_cleaned = cleaning(train_data, 'nn_bp:features', logger, features) train_data = dh.fill_train_data(train_data_cleaned) # 创建特征和目标的标准化器 train_scaler = MinMaxScaler(feature_range=(0, 1)) # 标准化特征和目标 scaled_train_data = train_scaler.fit_transform(train_data[features+[target]]) # 保存两个scaler scaled_train_bytes = BytesIO() joblib.dump(scaled_train_data, scaled_train_bytes) scaled_train_bytes.seek(0) # Reset pointer to the beginning of the byte stream x_train, x_valid, y_train, y_valid = dh.get_train_data(scaled_train_data) return x_train, x_valid, y_train, y_valid, scaled_train_bytes def pre_data_handler(data, args): if 'is_limit' in data.columns: data = data[data['is_limit'] == False] features, time_steps, col_time, model_name,col_reserve = str_to_list(args['features']), int(args['time_steps']),args['col_time'],args['model_name'],str_to_list(args['col_reserve']) feature_scaler,target_scaler = get_scaler_model_from_mongo(args) pre_data = data.sort_values(by=col_time) scaled_features = feature_scaler.transform(pre_data[features]) return scaled_features class BPHandler(object): def __init__(self, logger): self.logger = logger self.model = None def get_model(self, args): """ 单例模式+线程锁,防止在异步加载时引发线程安全 """ try: with model_lock: # NPHandler.model = NPHandler.get_keras_model(opt) self.model = get_h5_model_from_mongo(args) except Exception as e: self.logger.info("加载模型权重失败:{}".format(e.args)) @staticmethod def get_keras_model(opt): # db_loss = NorthEastLoss(opt) # south_loss = SouthLoss(opt) l1_reg = regularizers.l1(opt.Model['lambda_value_1']) l2_reg = regularizers.l2(opt.Model['lambda_value_2']) nwp_input = Input(shape=(opt.Model['time_step'], opt.Model['input_size_nwp']), name='nwp') env_input = Input(shape=(opt.Model['his_points'], opt.Model['input_size_env']), name='env') con1 = Conv1D(filters=64, kernel_size=1, strides=1, padding='valid', activation='relu', kernel_regularizer=l2_reg)(nwp_input) d1 = Dense(32, activation='relu', name='d1', kernel_regularizer=l1_reg)(con1) nwp = Dense(8, activation='relu', name='d2', kernel_regularizer=l1_reg)(d1) output = Dense(opt.Model['output_size'], name='d5')(nwp) model = Model([env_input, nwp_input], output) adam = optimizers.Adam(learning_rate=opt.Model['learning_rate'], beta_1=0.9, beta_2=0.999, epsilon=1e-7, amsgrad=True) reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.01, patience=5, verbose=1) model.compile(loss='rmse', optimizer=adam) return model def train_init(self, opt): try: if opt.Model['add_train']: # 进行加强训练,支持修模 base_train_model = get_h5_model_from_mongo(vars(opt)) base_train_model.summary() self.logger.info("已加载加强训练基础模型") else: base_train_model = self.get_keras_model(opt) return base_train_model except Exception as e: self.logger.info("加强训练加载模型权重失败:{}".format(e.args)) def training(self, opt, train_and_valid_data): model = self.train_init(opt) tf.reset_default_graph() # 清除默认图 train_x, train_y, valid_x, valid_y = train_and_valid_data print("----------", np.array(train_x[0]).shape) print("++++++++++", np.array(train_x[1]).shape) check_point = ModelCheckpoint(filepath='./var/' + 'fmi.h5', monitor='val_loss', save_best_only=True, mode='auto') early_stop = EarlyStopping(monitor='val_loss', patience=opt.Model['patience'], mode='auto') history = model.fit(train_x, train_y, batch_size=opt.Model['batch_size'], epochs=opt.Model['epoch'], verbose=2, validation_data=(valid_x, valid_y), callbacks=[check_point, early_stop], shuffle=False) loss = np.round(history.history['loss'], decimals=5) val_loss = np.round(history.history['val_loss'], decimals=5) self.logger.info("-----模型训练经过{}轮迭代-----".format(len(loss))) self.logger.info("训练集损失函数为:{}".format(loss)) self.logger.info("验证集损失函数为:{}".format(val_loss)) return model def predict(self, test_X, batch_size=1): result = self.model.predict(test_X, batch_size=batch_size) self.logger.info("执行预测方法") return result @app.route('/model_training_bp', methods=['POST']) def model_training_bp(): # 获取程序开始时间 start_time = time.time() result = {} success = 0 bp = BPHandler(logger) print("Program starts execution!") try: args_dict = request.values.to_dict() args = arguments.deepcopy() opt = argparse.Namespace(**args) logger.info(args_dict) train_data = get_data_from_mongo(args_dict) train_x, valid_x, train_y, valid_y, scaled_train_bytes = train_data_handler(train_data, opt) bp_model = bp.training(opt, [train_x, valid_x, train_y, valid_y]) args_dict['params'] = json.dumps(args) args_dict['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) insert_trained_model_into_mongo(bp_model, args_dict) insert_scaler_model_into_mongo(scaled_train_bytes, args_dict) success = 1 except Exception as e: my_exception = traceback.format_exc() my_exception.replace("\n", "\t") result['msg'] = my_exception end_time = time.time() result['success'] = success result['args'] = args result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time)) result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time)) print("Program execution ends!") return result @app.route('/model_prediction_bp', methods=['POST']) def model_prediction_bp(): # 获取程序开始时间 start_time = time.time() result = {} success = 0 bp = BPHandler(logger) print("Program starts execution!") try: params_dict = request.values.to_dict() args = arguments.deepcopy() args.update(params_dict) opt = argparse.Namespace(**args) print('args', args) logger.info(args) predict_data = get_data_from_mongo(args) scaled_features = pre_data_handler(predict_data, args) result = bp.predict(scaled_features, args) insert_data_into_mongo(result, args) success = 1 except Exception as e: my_exception = traceback.format_exc() my_exception.replace("\n", "\t") result['msg'] = my_exception end_time = time.time() result['success'] = success result['args'] = args result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time)) result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time)) print("Program execution ends!") return result if __name__ == "__main__": print("Program starts execution!") logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("model_training_bp log") from waitress import serve # serve(app, host="0.0.0.0", port=10103, threads=4) print("server start!") bp = BPHandler(logger) args = copy.deepcopy(bp) opt = argparse.Namespace(**arguments) logger.info(args) args_dict = {"mongodb_database": 'david_test', 'scaler_table': 'j00083_scaler', 'model_name': 'bp1.0.test', 'model_table': 'j00083_model', 'mongodb_read_table': 'j00083'} train_data = get_data_from_mongo(args_dict) train_x, valid_x, train_y, valid_y, scaled_train_bytes = train_data_handler(train_data, opt) bp_model = bp.training(opt, [train_x, valid_x, train_y, valid_y]) insert_trained_model_into_mongo(bp_model, args_dict) insert_scaler_model_into_mongo(scaled_train_bytes, args)