import numpy as np from sklearn.model_selection import train_test_split from flask import Flask,request import time import traceback import logging from sklearn.preprocessing import MinMaxScaler from io import BytesIO import joblib from tensorflow.keras.models import Sequential from tensorflow.keras.layers import LSTM, Dense, Dropout from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau import tensorflow as tf from common.database_dml import get_data_from_mongo,insert_h5_model_into_mongo from common.processing_data_common import missing_features,str_to_list import time import random import matplotlib.pyplot as plt app = Flask('model_training_lightgbm——service') def rmse(y_true, y_pred): return tf.math.sqrt(tf.reduce_mean(tf.square(y_true - y_pred))) def draw_loss(history): #绘制训练集和验证集损失 plt.figure(figsize=(20, 8)) plt.plot(history.history['loss'], label='Training Loss') plt.plot(history.history['val_loss'], label='Validation Loss') plt.title('Loss Curve') plt.xlabel('Epochs') plt.ylabel('Loss') plt.legend() plt.show() # 创建时间序列数据 def create_sequences(data_features,data_target,time_steps): X, y = [], [] if len(data_features)0: y.append(data_target[i + time_steps -1]) return np.array(X), np.array(y) def build_model(data, args): sleep_time = random.uniform(1, 20) # 生成 5 到 20 之间的随机浮动秒数 time.sleep(sleep_time) tf.keras.backend.clear_session() # 清除当前的图和会话 col_time, time_steps,features,target = args['col_time'], int(args['time_steps']), str_to_list(args['features']),args['target'] if 'is_limit' in data.columns: data = data[data['is_limit']==False] # 清洗特征平均缺失率大于20%的天 data = missing_features(data, features, col_time) train_data = data.sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill') # X_train, X_test, y_train, y_test = process_data(df_clean, params) # 创建特征和目标的标准化器 feature_scaler = MinMaxScaler(feature_range=(0, 1)) target_scaler = MinMaxScaler(feature_range=(0, 1)) # 标准化特征和目标 scaled_features = feature_scaler.fit_transform(train_data[features]) scaled_target = target_scaler.fit_transform(train_data[[target]]) # 保存两个scaler feature_scaler_bytes = BytesIO() joblib.dump(feature_scaler, feature_scaler_bytes) feature_scaler_bytes.seek(0) # Reset pointer to the beginning of the byte stream target_scaler_bytes = BytesIO() joblib.dump(target_scaler, target_scaler_bytes) target_scaler_bytes.seek(0) X, y = create_sequences(scaled_features, scaled_target, time_steps) # 划分训练集和测试集 X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=43) # 构建 LSTM 模型 model = Sequential() model.add(LSTM(units=64, return_sequences=False, input_shape=(time_steps, X_train.shape[2]))) model.add(Dense(1)) # 输出单一值 # 编译模型 model.compile(optimizer='adam', loss='mean_squared_error') # 定义 EarlyStopping 和 ReduceLROnPlateau 回调 early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True, verbose=1) reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, verbose=1) # 训练模型 # 使用GPU进行训练 with tf.device('/GPU:1'): history = model.fit(X_train, y_train, epochs=100, batch_size=32, validation_data=(X_test, y_test), verbose=2, shuffle=False, callbacks=[early_stopping, reduce_lr]) # draw_loss(history) return model,feature_scaler_bytes,target_scaler_bytes @app.route('/model_training_lstm', methods=['POST']) def model_training_lstm(): # 获取程序开始时间 start_time = time.time() result = {} success = 0 print("Program starts execution!") try: args = request.values.to_dict() print('args',args) logger.info(args) power_df = get_data_from_mongo(args) model,feature_scaler_bytes,target_scaler_bytes = build_model(power_df,args) insert_h5_model_into_mongo(model,feature_scaler_bytes,target_scaler_bytes ,args) success = 1 except Exception as e: my_exception = traceback.format_exc() my_exception.replace("\n","\t") result['msg'] = my_exception end_time = time.time() result['success'] = success result['args'] = args result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time)) result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time)) print("Program execution ends!") return result if __name__=="__main__": print("Program starts execution!") logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("model_training_lightgbm log") from waitress import serve serve(app, host="0.0.0.0", port=10096,threads=4) print("server start!")