nn_bp.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # time: 2024/5/6 13:25
  4. # file: time_series.py
  5. # author: David
  6. # company: shenyang JY
  7. import json, copy
  8. import numpy as np
  9. from flask import Flask, request
  10. import time
  11. import traceback
  12. import logging, argparse
  13. from sklearn.preprocessing import MinMaxScaler
  14. from io import BytesIO
  15. import joblib
  16. from tensorflow.keras.layers import Input, Dense, LSTM, concatenate, Conv1D, Conv2D, MaxPooling1D, Reshape, Flatten
  17. from tensorflow.keras.models import Model, load_model
  18. from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, TensorBoard, ReduceLROnPlateau
  19. from tensorflow.keras import optimizers, regularizers
  20. import tensorflow.keras.backend as K
  21. import tensorflow as tf
  22. from bson.decimal128 import Decimal128
  23. from common.data_cleaning import cleaning, key_field_row_cleaning
  24. from common.database_dml import *
  25. from common.processing_data_common import missing_features, str_to_list
  26. from data_processing.data_operation.data_handler import DataHandler
  27. from threading import Lock
  28. import time, yaml
  29. import random, numbers
  30. import matplotlib.pyplot as plt
  31. model_lock = Lock()
  32. from common.logs import Log
  33. logger = logging.getLogger()
  34. # logger = Log('models-processing').logger
  35. np.random.seed(42) # NumPy随机种子
  36. # tf.set_random_seed(42) # TensorFlow随机种子
  37. app = Flask('nn_bp——service')
  38. with app.app_context():
  39. with open('../model_koi/bp.yaml', 'r', encoding='utf-8') as f:
  40. arguments = yaml.safe_load(f)
  41. dh = DataHandler(logger, arguments)
  42. def train_data_handler(data, opt):
  43. """
  44. 训练数据预处理:
  45. 清洗+补值+归一化
  46. Aras:
  47. data: 从mongo中加载的数据
  48. opt:参数命名空间
  49. return:
  50. x_train
  51. x_valid
  52. y_train
  53. y_valid
  54. """
  55. col_time, features, target = opt.col_time, opt.features, opt.target
  56. # 清洗处理好的限电记录
  57. if 'is_limit' in data.columns:
  58. data = data[data['is_limit'] == False]
  59. # 筛选特征,数值化
  60. train_data = data[[col_time]+features+[target]]
  61. # 清洗特征平均缺失率大于20%的天
  62. train_data = missing_features(train_data, features, col_time)
  63. train_data = train_data.sort_values(by=col_time)
  64. # train_data = train_data.sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill')
  65. # 对清洗完限电的数据进行特征预处理:1.空值异常值清洗 2.缺值补值
  66. train_data_cleaned = key_field_row_cleaning(train_data, features+[target], logger)
  67. train_data_cleaned = train_data_cleaned.applymap(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x)
  68. # 创建特征和目标的标准化器
  69. train_scaler = MinMaxScaler(feature_range=(0, 1))
  70. # 标准化特征和目标
  71. scaled_train_data = train_scaler.fit_transform(train_data_cleaned[features+[target]])
  72. train_data_cleaned[features+[target]] = scaled_train_data
  73. train_datas = dh.fill_train_data(train_data_cleaned, col_time)
  74. # 保存两个scaler
  75. scaled_train_bytes = BytesIO()
  76. joblib.dump(scaled_train_data, scaled_train_bytes)
  77. scaled_train_bytes.seek(0) # Reset pointer to the beginning of the byte stream
  78. x_train, x_valid, y_train, y_valid = dh.get_train_data(train_datas, col_time, features, target)
  79. return x_train, x_valid, y_train, y_valid, scaled_train_bytes
  80. def pre_data_handler(data, args):
  81. if 'is_limit' in data.columns:
  82. data = data[data['is_limit'] == False]
  83. features, time_steps, col_time, model_name,col_reserve = str_to_list(args['features']), int(args['time_steps']),args['col_time'],args['model_name'],str_to_list(args['col_reserve'])
  84. feature_scaler,target_scaler = get_scaler_model_from_mongo(args)
  85. pre_data = data.sort_values(by=col_time)
  86. scaled_features = feature_scaler.transform(pre_data[features])
  87. return scaled_features
  88. class BPHandler(object):
  89. def __init__(self, logger):
  90. self.logger = logger
  91. self.model = None
  92. def get_model(self, args):
  93. """
  94. 单例模式+线程锁,防止在异步加载时引发线程安全
  95. """
  96. try:
  97. with model_lock:
  98. # NPHandler.model = NPHandler.get_keras_model(opt)
  99. self.model = get_h5_model_from_mongo(args)
  100. except Exception as e:
  101. self.logger.info("加载模型权重失败:{}".format(e.args))
  102. @staticmethod
  103. def get_keras_model(opt):
  104. # db_loss = NorthEastLoss(opt)
  105. # south_loss = SouthLoss(opt)
  106. from models_processing.losses.loss_cdq import rmse
  107. l1_reg = regularizers.l1(opt.Model['lambda_value_1'])
  108. l2_reg = regularizers.l2(opt.Model['lambda_value_2'])
  109. nwp_input = Input(shape=(opt.Model['time_step'], opt.Model['input_size']), name='nwp')
  110. con1 = Conv1D(filters=64, kernel_size=1, strides=1, padding='valid', activation='relu', kernel_regularizer=l2_reg)(nwp_input)
  111. d1 = Dense(32, activation='relu', name='d1', kernel_regularizer=l1_reg)(con1)
  112. nwp = Dense(8, activation='relu', name='d2', kernel_regularizer=l1_reg)(d1)
  113. output = Dense(opt.Model['output_size'], name='d5')(nwp)
  114. model = Model(nwp_input, output)
  115. adam = optimizers.Adam(learning_rate=opt.Model['learning_rate'], beta_1=0.9, beta_2=0.999, epsilon=1e-7, amsgrad=True)
  116. reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.01, patience=5, verbose=1)
  117. model.compile(loss=rmse, optimizer=adam)
  118. return model
  119. def train_init(self, opt):
  120. try:
  121. if opt.Model['add_train']:
  122. # 进行加强训练,支持修模
  123. base_train_model = get_h5_model_from_mongo(vars(opt))
  124. base_train_model.summary()
  125. self.logger.info("已加载加强训练基础模型")
  126. else:
  127. base_train_model = self.get_keras_model(opt)
  128. return base_train_model
  129. except Exception as e:
  130. self.logger.info("加强训练加载模型权重失败:{}".format(e.args))
  131. def training(self, opt, train_and_valid_data):
  132. model = self.train_init(opt)
  133. # tf.reset_default_graph() # 清除默认图
  134. train_x, train_y, valid_x, valid_y = train_and_valid_data
  135. print("----------", np.array(train_x[0]).shape)
  136. print("++++++++++", np.array(train_x[1]).shape)
  137. check_point = ModelCheckpoint(filepath='./var/' + 'fmi.h5', monitor='val_loss', save_best_only=True, mode='auto')
  138. early_stop = EarlyStopping(monitor='val_loss', patience=opt.Model['patience'], mode='auto')
  139. history = model.fit(train_x, train_y, batch_size=opt.Model['batch_size'], epochs=opt.Model['epoch'], verbose=2, validation_data=(valid_x, valid_y), callbacks=[check_point, early_stop], shuffle=False)
  140. loss = np.round(history.history['loss'], decimals=5)
  141. val_loss = np.round(history.history['val_loss'], decimals=5)
  142. self.logger.info("-----模型训练经过{}轮迭代-----".format(len(loss)))
  143. self.logger.info("训练集损失函数为:{}".format(loss))
  144. self.logger.info("验证集损失函数为:{}".format(val_loss))
  145. return model
  146. def predict(self, test_X, batch_size=1):
  147. result = self.model.predict(test_X, batch_size=batch_size)
  148. self.logger.info("执行预测方法")
  149. return result
  150. @app.route('/model_training_bp', methods=['POST'])
  151. def model_training_bp():
  152. # 获取程序开始时间
  153. start_time = time.time()
  154. result = {}
  155. success = 0
  156. bp = BPHandler(logger)
  157. print("Program starts execution!")
  158. try:
  159. args_dict = request.values.to_dict()
  160. args = arguments.deepcopy()
  161. opt = argparse.Namespace(**args)
  162. logger.info(args_dict)
  163. train_data = get_data_from_mongo(args_dict)
  164. train_x, valid_x, train_y, valid_y, scaled_train_bytes = train_data_handler(train_data, opt)
  165. bp_model = bp.training(opt, [train_x, valid_x, train_y, valid_y])
  166. args_dict['params'] = json.dumps(args)
  167. args_dict['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
  168. insert_trained_model_into_mongo(bp_model, args_dict)
  169. insert_scaler_model_into_mongo(scaled_train_bytes, args_dict)
  170. success = 1
  171. except Exception as e:
  172. my_exception = traceback.format_exc()
  173. my_exception.replace("\n", "\t")
  174. result['msg'] = my_exception
  175. end_time = time.time()
  176. result['success'] = success
  177. result['args'] = args
  178. result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
  179. result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
  180. print("Program execution ends!")
  181. return result
  182. @app.route('/model_prediction_bp', methods=['POST'])
  183. def model_prediction_bp():
  184. # 获取程序开始时间
  185. start_time = time.time()
  186. result = {}
  187. success = 0
  188. bp = BPHandler(logger)
  189. print("Program starts execution!")
  190. try:
  191. params_dict = request.values.to_dict()
  192. args = arguments.deepcopy()
  193. args.update(params_dict)
  194. opt = argparse.Namespace(**args)
  195. print('args', args)
  196. logger.info(args)
  197. predict_data = get_data_from_mongo(args)
  198. scaled_features = pre_data_handler(predict_data, args)
  199. result = bp.predict(scaled_features, args)
  200. insert_data_into_mongo(result, args)
  201. success = 1
  202. except Exception as e:
  203. my_exception = traceback.format_exc()
  204. my_exception.replace("\n", "\t")
  205. result['msg'] = my_exception
  206. end_time = time.time()
  207. result['success'] = success
  208. result['args'] = args
  209. result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
  210. result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
  211. print("Program execution ends!")
  212. return result
  213. if __name__ == "__main__":
  214. print("Program starts execution!")
  215. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  216. logger = logging.getLogger("model_training_bp log")
  217. from waitress import serve
  218. # serve(app, host="0.0.0.0", port=10103, threads=4)
  219. print("server start!")
  220. bp = BPHandler(logger)
  221. args_dict = {"mongodb_database": 'david_test', 'scaler_table': 'j00083_scaler', 'model_name': 'bp1.0.test',
  222. 'model_table': 'j00083_model', 'mongodb_read_table': 'j00083', 'col_time': 'dateTime',
  223. 'features': 'speed10,direction10,speed30,direction30,speed50,direction50,speed70,direction70,speed90,direction90,speed110,direction110,speed150,direction150,speed170,direction170'}
  224. args_dict['features'] = args_dict['features'].split(',')
  225. arguments.update(args_dict)
  226. opt = argparse.Namespace(**arguments)
  227. opt.Model['input_size'] = len(opt.features)
  228. train_data = get_data_from_mongo(args_dict)
  229. train_x, valid_x, train_y, valid_y, scaled_train_bytes = train_data_handler(train_data, opt)
  230. bp_model = bp.training(opt, [train_x, train_y, valid_x, valid_y])
  231. insert_trained_model_into_mongo(bp_model, args_dict)
  232. insert_scaler_model_into_mongo(scaled_train_bytes, args_dict)