model_training_bp.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. import numpy as np
  2. from sklearn.model_selection import train_test_split
  3. from flask import Flask,request
  4. import time
  5. import traceback
  6. import logging
  7. from sklearn.preprocessing import MinMaxScaler
  8. from io import BytesIO
  9. import joblib
  10. from tensorflow.keras.models import Sequential
  11. from tensorflow.keras.layers import LSTM, Dense, Dropout
  12. from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
  13. import tensorflow as tf
  14. from common.database_dml import get_data_from_mongo,insert_h5_model_into_mongo
  15. from common.processing_data_common import missing_features,str_to_list
  16. import time
  17. import random
  18. import matplotlib.pyplot as plt
  19. app = Flask('model_training_bp——service')
  20. def rmse(y_true, y_pred):
  21. return tf.math.sqrt(tf.reduce_mean(tf.square(y_true - y_pred)))
  22. def draw_loss(history):
  23. #绘制训练集和验证集损失
  24. plt.figure(figsize=(20, 8))
  25. plt.plot(history.history['loss'], label='Training Loss')
  26. plt.plot(history.history['val_loss'], label='Validation Loss')
  27. plt.title('Loss Curve')
  28. plt.xlabel('Epochs')
  29. plt.ylabel('Loss')
  30. plt.legend()
  31. plt.show()
  32. # 创建时间序列数据
  33. def build_model(data, args):
  34. sleep_time = random.uniform(1, 20) # 生成 5 到 20 之间的随机浮动秒数
  35. time.sleep(sleep_time)
  36. tf.keras.backend.clear_session() # 清除当前的图和会话
  37. # 设置随机种子
  38. np.random.seed(42) # NumPy随机种子
  39. tf.random.set_seed(42) # TensorFlow随机种子
  40. col_time,features,target = args['col_time'], str_to_list(args['features']),args['target']
  41. if 'is_limit' in data.columns:
  42. data = data[data['is_limit']==False]
  43. # 清洗特征平均缺失率大于20%的天
  44. data = missing_features(data, features, col_time)
  45. train_data = data.sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill')
  46. # 创建特征和目标的标准化器
  47. feature_scaler = MinMaxScaler(feature_range=(0, 1))
  48. target_scaler = MinMaxScaler(feature_range=(0, 1))
  49. # 标准化特征和目标
  50. scaled_features = feature_scaler.fit_transform(train_data[features])
  51. scaled_target = target_scaler.fit_transform(train_data[[target]])
  52. # 保存两个scaler
  53. feature_scaler_bytes = BytesIO()
  54. joblib.dump(feature_scaler, feature_scaler_bytes)
  55. feature_scaler_bytes.seek(0) # Reset pointer to the beginning of the byte stream
  56. target_scaler_bytes = BytesIO()
  57. joblib.dump(target_scaler, target_scaler_bytes)
  58. target_scaler_bytes.seek(0)
  59. # 划分训练集和测试集
  60. X_train, X_test, y_train, y_test = train_test_split(scaled_features, scaled_target, test_size=0.2, random_state=43)
  61. # 构建 LSTM 模型
  62. model = Sequential([
  63. Dense(64, input_dim=X_train.shape[1], activation='relu'), # 输入层和隐藏层,10个神经元
  64. Dropout(0.2),
  65. Dense(32, activation='relu'), # 隐藏层,8个神经元
  66. Dropout(0.3), # Dropout层,30%的神经元输出会被随机丢弃
  67. Dense(1, activation='linear') # 输出层,1个神经元(用于回归任务)
  68. ])
  69. # 编译模型
  70. model.compile(optimizer='adam', loss='mean_squared_error')
  71. # 定义 EarlyStopping 和 ReduceLROnPlateau 回调
  72. early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True, verbose=1)
  73. reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, verbose=1)
  74. # 训练模型
  75. # 使用GPU进行训练
  76. with tf.device('/GPU:1'):
  77. history = model.fit(X_train, y_train,
  78. epochs=100,
  79. batch_size=32,
  80. validation_data=(X_test, y_test),
  81. verbose=2,
  82. shuffle=False,
  83. callbacks=[early_stopping, reduce_lr])
  84. draw_loss(history)
  85. return model,feature_scaler_bytes,target_scaler_bytes
  86. @app.route('/model_training_bp', methods=['POST'])
  87. def model_training_bp():
  88. # 获取程序开始时间
  89. start_time = time.time()
  90. result = {}
  91. success = 0
  92. print("Program starts execution!")
  93. try:
  94. args = request.values.to_dict()
  95. print('args',args)
  96. logger.info(args)
  97. power_df = get_data_from_mongo(args)
  98. model,feature_scaler_bytes,target_scaler_bytes = build_model(power_df,args)
  99. insert_h5_model_into_mongo(model,feature_scaler_bytes,target_scaler_bytes ,args)
  100. success = 1
  101. except Exception as e:
  102. my_exception = traceback.format_exc()
  103. my_exception.replace("\n","\t")
  104. result['msg'] = my_exception
  105. end_time = time.time()
  106. result['success'] = success
  107. result['args'] = args
  108. result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
  109. result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
  110. print("Program execution ends!")
  111. return result
  112. if __name__=="__main__":
  113. print("Program starts execution!")
  114. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  115. logger = logging.getLogger("model_training_bp log")
  116. from waitress import serve
  117. serve(app, host="0.0.0.0", port=10103,threads=4)
  118. print("server start!")