|
@@ -0,0 +1,179 @@
|
|
|
|
+import pandas as pd
|
|
|
|
+import numpy as np
|
|
|
|
+from pymongo import MongoClient
|
|
|
|
+from sklearn.model_selection import train_test_split
|
|
|
|
+from flask import Flask,request
|
|
|
|
+import time
|
|
|
|
+import traceback
|
|
|
|
+import logging
|
|
|
|
+from sklearn.preprocessing import MinMaxScaler
|
|
|
|
+from io import BytesIO
|
|
|
|
+import joblib
|
|
|
|
+from tensorflow.keras.models import Sequential
|
|
|
|
+from tensorflow.keras.layers import LSTM, Dense
|
|
|
|
+from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
|
|
|
|
+import matplotlib.pyplot as plt
|
|
|
|
+import tensorflow as tf
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+app = Flask('model_training_lightgbm——service')
|
|
|
|
+
|
|
|
|
+def draw_loss(history):
|
|
|
|
+ #绘制训练集和验证集损失
|
|
|
|
+ plt.figure(figsize=(20, 8))
|
|
|
|
+ plt.plot(history.history['loss'], label='Training Loss')
|
|
|
|
+ plt.plot(history.history['val_loss'], label='Validation Loss')
|
|
|
|
+ plt.title('Loss Curve')
|
|
|
|
+ plt.xlabel('Epochs')
|
|
|
|
+ plt.ylabel('Loss')
|
|
|
|
+ plt.legend()
|
|
|
|
+ plt.show()
|
|
|
|
+
|
|
|
|
+def get_data_from_mongo(args):
|
|
|
|
+ mongodb_connection,mongodb_database,mongodb_read_table,timeBegin,timeEnd = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_read_table'],args['timeBegin'],args['timeEnd']
|
|
|
|
+ client = MongoClient(mongodb_connection)
|
|
|
|
+ # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
|
|
|
|
+ db = client[mongodb_database]
|
|
|
|
+ collection = db[mongodb_read_table] # 集合名称
|
|
|
|
+ query = {"dateTime": {"$gte": timeBegin, "$lte": timeEnd}}
|
|
|
|
+ cursor = collection.find(query)
|
|
|
|
+ data = list(cursor)
|
|
|
|
+ df = pd.DataFrame(data)
|
|
|
|
+ # 4. 删除 _id 字段(可选)
|
|
|
|
+ if '_id' in df.columns:
|
|
|
|
+ df = df.drop(columns=['_id'])
|
|
|
|
+ client.close()
|
|
|
|
+ return df
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def insert_model_into_mongo(model,feature_scaler_bytes,target_scaler_bytes ,args):
|
|
|
|
+ mongodb_connection,mongodb_database,scaler_table,model_table,model_name = ("mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",
|
|
|
|
+ args['mongodb_database'],args['scaler_table'],args['model_table'],args['model_name'])
|
|
|
|
+ client = MongoClient(mongodb_connection)
|
|
|
|
+ db = client[mongodb_database]
|
|
|
|
+ collection = db[scaler_table] # 集合名称
|
|
|
|
+ # Save the scalers in MongoDB as binary data
|
|
|
|
+ collection.insert_one({
|
|
|
|
+ "feature_scaler": feature_scaler_bytes.read(),
|
|
|
|
+ "target_scaler": target_scaler_bytes.read()
|
|
|
|
+ })
|
|
|
|
+ print("model inserted successfully!")
|
|
|
|
+ model_table = db[model_table]
|
|
|
|
+ # 创建 BytesIO 缓冲区
|
|
|
|
+ model_buffer = BytesIO()
|
|
|
|
+ # 将模型保存为 HDF5 格式到内存 (BytesIO)
|
|
|
|
+ model.save(model_buffer, save_format='h5')
|
|
|
|
+ # 将指针移到缓冲区的起始位置
|
|
|
|
+ model_buffer.seek(0)
|
|
|
|
+ # 获取模型的二进制数据
|
|
|
|
+ model_data = model_buffer.read()
|
|
|
|
+ # 将模型保存到 MongoDB
|
|
|
|
+ model_table.insert_one({
|
|
|
|
+ "model_name": model_name,
|
|
|
|
+ "model_data": model_data
|
|
|
|
+ })
|
|
|
|
+ print("模型成功保存到 MongoDB!")
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def rmse(y_true, y_pred):
|
|
|
|
+ return tf.math.sqrt(tf.reduce_mean(tf.square(y_true - y_pred)))
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+# 创建时间序列数据
|
|
|
|
+def create_sequences(data_features,data_target,time_steps):
|
|
|
|
+ X, y = [], []
|
|
|
|
+ if len(data_features)<time_steps:
|
|
|
|
+ print("数据长度不能比时间步长小!")
|
|
|
|
+ return np.array(X), np.array(y)
|
|
|
|
+ else:
|
|
|
|
+ for i in range(len(data_features) - time_steps+1):
|
|
|
|
+ X.append(data_features[i:(i + time_steps)])
|
|
|
|
+ if len(data_target)>0:
|
|
|
|
+ y.append(data_target[i + time_steps -1])
|
|
|
|
+ return np.array(X), np.array(y)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def build_model(data, args):
|
|
|
|
+ begin_time, end_time, col_time, time_steps,features,target = args['begin_time'], args['end_time'], args['col_time'], args['time_steps'], args['features'],args['target']
|
|
|
|
+ train_data = data[(data[col_time] >= begin_time)&(data[col_time] < end_time)]
|
|
|
|
+ # X_train, X_test, y_train, y_test = process_data(df_clean, params)
|
|
|
|
+ # 创建特征和目标的标准化器
|
|
|
|
+ feature_scaler = MinMaxScaler(feature_range=(0, 1))
|
|
|
|
+ target_scaler = MinMaxScaler(feature_range=(0, 1))
|
|
|
|
+ # 标准化特征和目标
|
|
|
|
+ scaled_features = feature_scaler.fit_transform(data[features])
|
|
|
|
+ scaled_target = target_scaler.fit_transform(data[[target]])
|
|
|
|
+ # 保存两个scaler
|
|
|
|
+ feature_scaler_bytes = BytesIO()
|
|
|
|
+ joblib.dump(feature_scaler, feature_scaler_bytes)
|
|
|
|
+ feature_scaler_bytes.seek(0) # Reset pointer to the beginning of the byte stream
|
|
|
|
+ target_scaler_bytes = BytesIO()
|
|
|
|
+ joblib.dump(target_scaler, target_scaler_bytes)
|
|
|
|
+ target_scaler_bytes.seek(0)
|
|
|
|
+ X, y = create_sequences(scaled_features, scaled_target, time_steps)
|
|
|
|
+ # 划分训练集和测试集
|
|
|
|
+ X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=43)
|
|
|
|
+
|
|
|
|
+ # 构建 LSTM 模型
|
|
|
|
+ model = Sequential()
|
|
|
|
+ model.add(LSTM(units=50, return_sequences=False, input_shape=(time_steps, X_train.shape[2])))
|
|
|
|
+ model.add(Dense(1)) # 输出单一值
|
|
|
|
+ # 编译模型
|
|
|
|
+ model.compile(optimizer='adam', loss='mean_squared_error')
|
|
|
|
+ # 定义 EarlyStopping 和 ReduceLROnPlateau 回调
|
|
|
|
+ early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True, verbose=1)
|
|
|
|
+ reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, verbose=1)
|
|
|
|
+ # 训练模型
|
|
|
|
+ history = model.fit(X_train, y_train,
|
|
|
|
+ epochs=100,
|
|
|
|
+ batch_size=32,
|
|
|
|
+ validation_data=(X_test, y_test),
|
|
|
|
+ verbose=2,
|
|
|
|
+ callbacks=[early_stopping, reduce_lr])
|
|
|
|
+ draw_loss(history)
|
|
|
|
+ return model,feature_scaler_bytes,target_scaler_bytes
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def str_to_list(arg):
|
|
|
|
+ if arg == '':
|
|
|
|
+ return []
|
|
|
|
+ else:
|
|
|
|
+ return arg.split(',')
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+@app.route('/model_training_lstm', methods=['POST'])
|
|
|
|
+def model_training_lstm():
|
|
|
|
+ # 获取程序开始时间
|
|
|
|
+ start_time = time.time()
|
|
|
|
+ result = {}
|
|
|
|
+ success = 0
|
|
|
|
+ print("Program starts execution!")
|
|
|
|
+ try:
|
|
|
|
+ args = request.values.to_dict()
|
|
|
|
+ print('args',args)
|
|
|
|
+ logger.info(args)
|
|
|
|
+ power_df = get_data_from_mongo(args)
|
|
|
|
+ model,feature_scaler_bytes,target_scaler_bytes = build_model(power_df,args)
|
|
|
|
+ insert_model_into_mongo(model,feature_scaler_bytes,target_scaler_bytes ,args)
|
|
|
|
+ success = 1
|
|
|
|
+ except Exception as e:
|
|
|
|
+ my_exception = traceback.format_exc()
|
|
|
|
+ my_exception.replace("\n","\t")
|
|
|
|
+ result['msg'] = my_exception
|
|
|
|
+ end_time = time.time()
|
|
|
|
+
|
|
|
|
+ result['success'] = success
|
|
|
|
+ result['args'] = args
|
|
|
|
+ result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
|
|
|
|
+ result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
|
|
|
|
+ print("Program execution ends!")
|
|
|
|
+ return result
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+if __name__=="__main__":
|
|
|
|
+ print("Program starts execution!")
|
|
|
|
+ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
|
|
+ logger = logging.getLogger("model_training_lightgbm log")
|
|
|
|
+ from waitress import serve
|
|
|
|
+ serve(app, host="0.0.0.0", port=10096)
|
|
|
|
+ print("server start!")
|