123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # time: 2024/5/6 13:25
- # file: time_series.py
- # author: David
- # company: shenyang JY
- import json, copy
- import numpy as np
- from flask import Flask, request
- import time
- import traceback
- import logging, argparse
- from sklearn.preprocessing import MinMaxScaler
- from io import BytesIO
- import joblib
- from tensorflow.keras.layers import Input, Dense, LSTM, concatenate, Conv1D, Conv2D, MaxPooling1D, Reshape, Flatten
- from tensorflow.keras.models import Model, load_model
- from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, TensorBoard, ReduceLROnPlateau
- from tensorflow.keras import optimizers, regularizers
- import tensorflow.keras.backend as K
- import tensorflow as tf
- from common.data_cleaning import cleaning
- from common.database_dml import *
- from common.processing_data_common import missing_features, str_to_list
- from data_processing.data_operation.data_handler import DataHandler
- from threading import Lock
- import time, yaml
- import random
- import matplotlib.pyplot as plt
- model_lock = Lock()
- from common.logs import Log
- logger = logging.getLogger()
- # logger = Log('models-processing').logger
- np.random.seed(42) # NumPy随机种子
- tf.random.set_random_seed(42) # TensorFlow随机种子
- app = Flask('nn_bp——service')
- with app.app_context():
- with open('../model_koi/bp.yaml', 'r', encoding='utf-8') as f:
- arguments = yaml.safe_load(f)
- dh = DataHandler(logger, arguments)
- def train_data_handler(data, opt):
- col_time, features, target = opt.col_time, opt.features, opt.target
- if 'is_limit' in data.columns:
- data = data[data['is_limit'] == False]
- # 清洗特征平均缺失率大于20%的天
- data = missing_features(data, features, col_time)
- train_data = data.sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill')
- train_data = train_data.sort_values(by=col_time)
- # 对清洗完限电的数据进行特征预处理:1.空值异常值清洗 2.缺值补值
- train_data_cleaned = cleaning(train_data, 'nn_bp:features', logger, features)
- train_data = dh.fill_train_data(train_data_cleaned)
- # 创建特征和目标的标准化器
- train_scaler = MinMaxScaler(feature_range=(0, 1))
- # 标准化特征和目标
- scaled_train_data = train_scaler.fit_transform(train_data[features+[target]])
- # 保存两个scaler
- scaled_train_bytes = BytesIO()
- joblib.dump(scaled_train_data, scaled_train_bytes)
- scaled_train_bytes.seek(0) # Reset pointer to the beginning of the byte stream
- x_train, x_valid, y_train, y_valid = dh.get_train_data(scaled_train_data)
- return x_train, x_valid, y_train, y_valid, scaled_train_bytes
- def pre_data_handler(data, args):
- if 'is_limit' in data.columns:
- data = data[data['is_limit'] == False]
- features, time_steps, col_time, model_name,col_reserve = str_to_list(args['features']), int(args['time_steps']),args['col_time'],args['model_name'],str_to_list(args['col_reserve'])
- feature_scaler,target_scaler = get_scaler_model_from_mongo(args)
- pre_data = data.sort_values(by=col_time)
- scaled_features = feature_scaler.transform(pre_data[features])
- return scaled_features
- class BPHandler(object):
- def __init__(self, logger):
- self.logger = logger
- self.model = None
- def get_model(self, args):
- """
- 单例模式+线程锁,防止在异步加载时引发线程安全
- """
- try:
- with model_lock:
- # NPHandler.model = NPHandler.get_keras_model(opt)
- self.model = get_h5_model_from_mongo(args)
- except Exception as e:
- self.logger.info("加载模型权重失败:{}".format(e.args))
- @staticmethod
- def get_keras_model(opt):
- # db_loss = NorthEastLoss(opt)
- # south_loss = SouthLoss(opt)
- l1_reg = regularizers.l1(opt.Model['lambda_value_1'])
- l2_reg = regularizers.l2(opt.Model['lambda_value_2'])
- nwp_input = Input(shape=(opt.Model['time_step'], opt.Model['input_size_nwp']), name='nwp')
- env_input = Input(shape=(opt.Model['his_points'], opt.Model['input_size_env']), name='env')
- con1 = Conv1D(filters=64, kernel_size=1, strides=1, padding='valid', activation='relu',
- kernel_regularizer=l2_reg)(nwp_input)
- d1 = Dense(32, activation='relu', name='d1', kernel_regularizer=l1_reg)(con1)
- nwp = Dense(8, activation='relu', name='d2', kernel_regularizer=l1_reg)(d1)
- output = Dense(opt.Model['output_size'], name='d5')(nwp)
- model = Model([env_input, nwp_input], output)
- adam = optimizers.Adam(learning_rate=opt.Model['learning_rate'], beta_1=0.9, beta_2=0.999, epsilon=1e-7,
- amsgrad=True)
- reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.01, patience=5, verbose=1)
- model.compile(loss='rmse', optimizer=adam)
- return model
- def train_init(self, opt):
- try:
- if opt.Model['add_train']:
- # 进行加强训练,支持修模
- base_train_model = get_h5_model_from_mongo(vars(opt))
- base_train_model.summary()
- self.logger.info("已加载加强训练基础模型")
- else:
- base_train_model = self.get_keras_model(opt)
- return base_train_model
- except Exception as e:
- self.logger.info("加强训练加载模型权重失败:{}".format(e.args))
- def training(self, opt, train_and_valid_data):
- model = self.train_init(opt)
- tf.reset_default_graph() # 清除默认图
- train_x, train_y, valid_x, valid_y = train_and_valid_data
- print("----------", np.array(train_x[0]).shape)
- print("++++++++++", np.array(train_x[1]).shape)
- check_point = ModelCheckpoint(filepath='./var/' + 'fmi.h5', monitor='val_loss',
- save_best_only=True, mode='auto')
- early_stop = EarlyStopping(monitor='val_loss', patience=opt.Model['patience'], mode='auto')
- history = model.fit(train_x, train_y, batch_size=opt.Model['batch_size'], epochs=opt.Model['epoch'], verbose=2,
- validation_data=(valid_x, valid_y), callbacks=[check_point, early_stop], shuffle=False)
- loss = np.round(history.history['loss'], decimals=5)
- val_loss = np.round(history.history['val_loss'], decimals=5)
- self.logger.info("-----模型训练经过{}轮迭代-----".format(len(loss)))
- self.logger.info("训练集损失函数为:{}".format(loss))
- self.logger.info("验证集损失函数为:{}".format(val_loss))
- return model
- def predict(self, test_X, batch_size=1):
- result = self.model.predict(test_X, batch_size=batch_size)
- self.logger.info("执行预测方法")
- return result
- @app.route('/model_training_bp', methods=['POST'])
- def model_training_bp():
- # 获取程序开始时间
- start_time = time.time()
- result = {}
- success = 0
- bp = BPHandler(logger)
- print("Program starts execution!")
- try:
- args_dict = request.values.to_dict()
- args = arguments.deepcopy()
- opt = argparse.Namespace(**args)
- logger.info(args_dict)
- train_data = get_data_from_mongo(args_dict)
- train_x, valid_x, train_y, valid_y, scaled_train_bytes = train_data_handler(train_data, opt)
- bp_model = bp.training(opt, [train_x, valid_x, train_y, valid_y])
- args_dict['params'] = json.dumps(args)
- args_dict['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
- insert_trained_model_into_mongo(bp_model, args_dict)
- insert_scaler_model_into_mongo(scaled_train_bytes, args_dict)
- success = 1
- except Exception as e:
- my_exception = traceback.format_exc()
- my_exception.replace("\n", "\t")
- result['msg'] = my_exception
- end_time = time.time()
- result['success'] = success
- result['args'] = args
- result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
- result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
- print("Program execution ends!")
- return result
- @app.route('/model_prediction_bp', methods=['POST'])
- def model_prediction_bp():
- # 获取程序开始时间
- start_time = time.time()
- result = {}
- success = 0
- bp = BPHandler(logger)
- print("Program starts execution!")
- try:
- params_dict = request.values.to_dict()
- args = arguments.deepcopy()
- args.update(params_dict)
- opt = argparse.Namespace(**args)
- print('args', args)
- logger.info(args)
- predict_data = get_data_from_mongo(args)
- scaled_features = pre_data_handler(predict_data, args)
- result = bp.predict(scaled_features, args)
- insert_data_into_mongo(result, args)
- success = 1
- except Exception as e:
- my_exception = traceback.format_exc()
- my_exception.replace("\n", "\t")
- result['msg'] = my_exception
- end_time = time.time()
- result['success'] = success
- result['args'] = args
- result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
- result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
- print("Program execution ends!")
- return result
- if __name__ == "__main__":
- print("Program starts execution!")
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- logger = logging.getLogger("model_training_bp log")
- from waitress import serve
- # serve(app, host="0.0.0.0", port=10103, threads=4)
- print("server start!")
- bp = BPHandler(logger)
- args = copy.deepcopy(bp)
- opt = argparse.Namespace(**arguments)
- logger.info(args)
- args_dict = {"mongodb_database": 'david_test', 'scaler_table': 'j00083_scaler', 'model_name': 'bp1.0.test',
- 'model_table': 'j00083_model', 'mongodb_read_table': 'j00083'}
- train_data = get_data_from_mongo(args_dict)
- train_x, valid_x, train_y, valid_y, scaled_train_bytes = train_data_handler(train_data, opt)
- bp_model = bp.training(opt, [train_x, valid_x, train_y, valid_y])
- insert_trained_model_into_mongo(bp_model, args_dict)
- insert_scaler_model_into_mongo(scaled_train_bytes, args)
|