nn_bp.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # time: 2024/5/6 13:25
  4. # file: time_series.py
  5. # author: David
  6. # company: shenyang JY
  7. import numpy as np
  8. from sklearn.model_selection import train_test_split
  9. from flask import Flask, request
  10. import time
  11. import traceback
  12. import logging
  13. from sklearn.preprocessing import MinMaxScaler
  14. from io import BytesIO
  15. import joblib
  16. from tensorflow.keras.layers import Input, Dense, LSTM, concatenate, Conv1D, Conv2D, MaxPooling1D, Reshape, Flatten
  17. from tensorflow.keras.models import Model, load_model
  18. from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, TensorBoard, ReduceLROnPlateau
  19. from tensorflow.keras import optimizers, regularizers
  20. import tensorflow.keras.backend as K
  21. import tensorflow as tf
  22. from common.data_cleaning import cleaning
  23. from common.database_dml import *
  24. from common.processing_data_common import missing_features, str_to_list
  25. from data_processing.data_operation.data_handler import DataHandler
  26. from threading import Lock
  27. import time
  28. import random
  29. import matplotlib.pyplot as plt
  30. model_lock = Lock()
  31. app = Flask('model_training_bp——service')
  32. def draw_loss(history):
  33. # 绘制训练集和验证集损失
  34. plt.figure(figsize=(20, 8))
  35. plt.plot(history.history['loss'], label='Training Loss')
  36. plt.plot(history.history['val_loss'], label='Validation Loss')
  37. plt.title('Loss Curve')
  38. plt.xlabel('Epochs')
  39. plt.ylabel('Loss')
  40. plt.legend()
  41. plt.show()
  42. dh = DataHandler()
  43. def train_data_handler(data, args):
  44. sleep_time = random.uniform(1, 20) # 生成 5 到 20 之间的随机浮动秒数
  45. time.sleep(sleep_time)
  46. tf.keras.backend.clear_session() # 清除当前的图和会话
  47. # 设置随机种子
  48. np.random.seed(42) # NumPy随机种子
  49. tf.random.set_seed(42) # TensorFlow随机种子
  50. col_time, features, target = args['col_time'], str_to_list(args['features']), args['target']
  51. if 'is_limit' in data.columns:
  52. data = data[data['is_limit'] == False]
  53. # 清洗特征平均缺失率大于20%的天
  54. train_data = data.sort_values(by=col_time)
  55. # 对清洗完限电的数据进行特征预处理:1.空值异常值清洗 2.缺值补值
  56. train_data_cleaned = cleaning(train_data, '', logger, train_data.columns.tolist())
  57. train_data = dh.fill_train_data(train_data_cleaned)
  58. # 创建特征和目标的标准化器
  59. train_scaler = MinMaxScaler(feature_range=(0, 1))
  60. # 标准化特征和目标
  61. scaled_train_data = train_scaler.fit_transform(train_data)
  62. # 保存两个scaler
  63. scaled_train_bytes = BytesIO()
  64. joblib.dump(scaled_train_data, scaled_train_bytes)
  65. scaled_train_bytes.seek(0) # Reset pointer to the beginning of the byte stream
  66. x_train, x_valid, y_train, y_valid = dh.get_train_data(scaled_train_data)
  67. return x_train, x_valid, y_train, y_valid, scaled_train_bytes
  68. def pre_data_handler(data, args):
  69. if 'is_limit' in data.columns:
  70. data = data[data['is_limit'] == False]
  71. features, time_steps, col_time, model_name,col_reserve = str_to_list(args['features']), int(args['time_steps']),args['col_time'],args['model_name'],str_to_list(args['col_reserve'])
  72. feature_scaler,target_scaler = get_scaler_model_from_mongo(args)
  73. pre_data = data.sort_values(by=col_time)
  74. # 对预测数据进行特征预处理:1.空值异常值清洗 2.缺值补值
  75. pre_data_cleaned = cleaning(pre_data, '', logger, pre_data.columns.tolist())
  76. pre_data = dh.fill_train_data(pre_data_cleaned)
  77. scaled_features = feature_scaler.transform(pre_data[features])
  78. return scaled_features
  79. class NPHandler(object):
  80. train = False
  81. def __init__(self, log, args, graph, sess):
  82. self.logger = log
  83. self.graph = graph
  84. self.sess = sess
  85. opt = args.parse_args_and_yaml()
  86. self.model = None
  87. def get_model(self, args):
  88. """
  89. 单例模式+线程锁,防止在异步加载时引发线程安全
  90. """
  91. try:
  92. with model_lock:
  93. # NPHandler.model = NPHandler.get_keras_model(opt)
  94. self.model = get_h5_model_from_mongo(args)
  95. except Exception as e:
  96. print("加载模型权重失败:{}".format(e.args))
  97. @staticmethod
  98. def get_keras_model(opt):
  99. # db_loss = NorthEastLoss(opt)
  100. # south_loss = SouthLoss(opt)
  101. l1_reg = regularizers.l1(opt.Model['lambda_value_1'])
  102. l2_reg = regularizers.l2(opt.Model['lambda_value_2'])
  103. nwp_input = Input(shape=(opt.Model['time_step'], opt.Model['input_size_nwp']), name='nwp')
  104. env_input = Input(shape=(opt.Model['his_points'], opt.Model['input_size_env']), name='env')
  105. con1 = Conv1D(filters=64, kernel_size=1, strides=1, padding='valid', activation='relu',
  106. kernel_regularizer=l2_reg)(nwp_input)
  107. d1 = Dense(32, activation='relu', name='d1', kernel_regularizer=l1_reg)(con1)
  108. nwp = Dense(8, activation='relu', name='d2', kernel_regularizer=l1_reg)(d1)
  109. output = Dense(opt.Model['output_size'], name='d5')(nwp)
  110. model = Model([env_input, nwp_input], output)
  111. adam = optimizers.Adam(learning_rate=opt.Model['learning_rate'], beta_1=0.9, beta_2=0.999, epsilon=1e-7,
  112. amsgrad=True)
  113. reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.01, patience=5, verbose=1)
  114. model.compile(loss=rmse, optimizer=adam)
  115. return model
  116. def train_init(self, opt, args):
  117. try:
  118. if opt.Model['add_train']:
  119. # 进行加强训练,支持修模
  120. base_train_model = get_h5_model_from_mongo(args)
  121. base_train_model.summary()
  122. self.logger.info("已加载加强训练基础模型")
  123. else:
  124. base_train_model = self.get_keras_model(opt)
  125. return base_train_model
  126. except Exception as e:
  127. self.logger.info("加强训练加载模型权重失败:{}".format(e.args))
  128. def training(self, opt, train_and_valid_data):
  129. model = self.train_init(opt)
  130. train_X, train_Y, valid_X, valid_Y = train_and_valid_data
  131. print("----------", np.array(train_X[0]).shape)
  132. print("++++++++++", np.array(train_X[1]).shape)
  133. # weight_lstm_1, bias_lstm_1 = model.get_layer('d1').get_weights()
  134. # print("weight_lstm_1 = ", weight_lstm_1)
  135. # print("bias_lstm_1 = ", bias_lstm_1)
  136. check_point = ModelCheckpoint(filepath='./var/' + 'fmi.h5', monitor='val_loss',
  137. save_best_only=True, mode='auto')
  138. early_stop = EarlyStopping(monitor='val_loss', patience=opt.Model['patience'], mode='auto')
  139. # tbCallBack = TensorBoard(log_dir='../figure',
  140. # histogram_freq=0,
  141. # write_graph=True,
  142. # write_images=True)
  143. history = model.fit(train_X, train_Y, batch_size=opt.Model['batch_size'], epochs=opt.Model['epoch'], verbose=2,
  144. validation_data=(valid_X, valid_Y), callbacks=[check_point, early_stop], shuffle=False)
  145. loss = np.round(history.history['loss'], decimals=5)
  146. val_loss = np.round(history.history['val_loss'], decimals=5)
  147. self.logger.info("-----模型训练经过{}轮迭代-----".format(len(loss)))
  148. self.logger.info("训练集损失函数为:{}".format(loss))
  149. self.logger.info("验证集损失函数为:{}".format(val_loss))
  150. return model
  151. def predict(self, test_X, batch_size=1):
  152. result = self.model.predict(test_X, batch_size=batch_size)
  153. self.logger.info("执行预测方法")
  154. return result
  155. def build_model(data, args):
  156. # 划分训练集和测试集
  157. X_train, X_test, y_train, y_test = train_test_split(scaled_features, scaled_target, test_size=0.2, random_state=43)
  158. # 构建 LSTM 模型
  159. model = Sequential([
  160. Dense(64, input_dim=X_train.shape[1], activation='relu'), # 输入层和隐藏层,10个神经元
  161. Dropout(0.2),
  162. Dense(32, activation='relu'), # 隐藏层,8个神经元
  163. Dropout(0.3), # Dropout层,30%的神经元输出会被随机丢弃
  164. Dense(1, activation='linear') # 输出层,1个神经元(用于回归任务)
  165. ])
  166. # 编译模型
  167. model.compile(optimizer='adam', loss='mean_squared_error')
  168. # 定义 EarlyStopping 和 ReduceLROnPlateau 回调
  169. early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True, verbose=1)
  170. reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, verbose=1)
  171. # 训练模型
  172. # 使用GPU进行训练
  173. with tf.device('/GPU:1'):
  174. history = model.fit(X_train, y_train,
  175. epochs=100,
  176. batch_size=32,
  177. validation_data=(X_test, y_test),
  178. verbose=2,
  179. shuffle=False,
  180. callbacks=[early_stopping, reduce_lr])
  181. draw_loss(history)
  182. return model, feature_scaler_bytes, target_scaler_bytes
  183. @app.route('/model_training_bp', methods=['POST'])
  184. def model_training_bp():
  185. # 获取程序开始时间
  186. start_time = time.time()
  187. result = {}
  188. success = 0
  189. nh = NPHandler()
  190. print("Program starts execution!")
  191. try:
  192. args = request.values.to_dict()
  193. print('args', args)
  194. logger.info(args)
  195. power_df = get_data_from_mongo(args)
  196. train_x, valid_x, train_y, valid_y, train_data_handler = dh.get_train_data(power_df)
  197. np_model = nh.training(opt, [train_x, valid_x, train_y, valid_y])
  198. model, feature_scaler_bytes, target_scaler_bytes = build_model(power_df, args)
  199. insert_h5_model_into_mongo(np_model, train_data_handler, args)
  200. success = 1
  201. except Exception as e:
  202. my_exception = traceback.format_exc()
  203. my_exception.replace("\n", "\t")
  204. result['msg'] = my_exception
  205. end_time = time.time()
  206. result['success'] = success
  207. result['args'] = args
  208. result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
  209. result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
  210. print("Program execution ends!")
  211. return result
  212. @app.route('/model_prediction_bp', methods=['POST'])
  213. def model_prediction_bp():
  214. # 获取程序开始时间
  215. start_time = time.time()
  216. result = {}
  217. success = 0
  218. nh = NPHandler()
  219. print("Program starts execution!")
  220. try:
  221. args = request.values.to_dict()
  222. print('args', args)
  223. logger.info(args)
  224. power_df = get_data_from_mongo(args)
  225. scaled_features = pre_data_handler(power_df, args)
  226. result = nh.predict(power_df, args)
  227. insert_data_into_mongo(result, args)
  228. success = 1
  229. except Exception as e:
  230. my_exception = traceback.format_exc()
  231. my_exception.replace("\n", "\t")
  232. result['msg'] = my_exception
  233. end_time = time.time()
  234. result['success'] = success
  235. result['args'] = args
  236. result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
  237. result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
  238. print("Program execution ends!")
  239. return result
  240. if __name__ == "__main__":
  241. print("Program starts execution!")
  242. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  243. logger = logging.getLogger("model_training_bp log")
  244. from waitress import serve
  245. serve(app, host="0.0.0.0", port=10103, threads=4)
  246. print("server start!")