123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- import numpy as np
- from sklearn.model_selection import train_test_split
- from flask import Flask,request
- import time
- import traceback
- import logging
- from sklearn.preprocessing import MinMaxScaler
- from io import BytesIO
- import joblib
- from tensorflow.keras.models import Sequential
- from tensorflow.keras.layers import LSTM, Dense
- from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
- import tensorflow as tf
- from common.database_dml import get_data_from_mongo,insert_h5_model_into_mongo
- from common.processing_data_common import missing_features,str_to_list
- import threading
- app = Flask('model_training_lightgbm——service')
- # 创建一个锁对象
- lock = threading.Lock()
- def rmse(y_true, y_pred):
- return tf.math.sqrt(tf.reduce_mean(tf.square(y_true - y_pred)))
- # 创建时间序列数据
- def create_sequences(data_features,data_target,time_steps):
- X, y = [], []
- if len(data_features)<time_steps:
- print("数据长度不能比时间步长小!")
- return np.array(X), np.array(y)
- else:
- for i in range(len(data_features) - time_steps+1):
- X.append(data_features[i:(i + time_steps)])
- if len(data_target)>0:
- y.append(data_target[i + time_steps -1])
- return np.array(X), np.array(y)
- def build_model(data, args):
- tf.keras.backend.clear_session() # 清除当前的图和会话
- col_time, time_steps,features,target = args['col_time'], int(args['time_steps']), str_to_list(args['features']),args['target']
- if 'is_limit' in data.columns:
- data = data[data['is_limit']==False]
- # 清洗特征平均缺失率大于20%的天
- df = missing_features(data, features, col_time)
- train_data = data.fillna(method='ffill').fillna(method='bfill').sort_values(by=col_time)
- # X_train, X_test, y_train, y_test = process_data(df_clean, params)
- # 创建特征和目标的标准化器
- feature_scaler = MinMaxScaler(feature_range=(0, 1))
- target_scaler = MinMaxScaler(feature_range=(0, 1))
- # 标准化特征和目标
- scaled_features = feature_scaler.fit_transform(train_data[features])
- scaled_target = target_scaler.fit_transform(train_data[[target]])
- # 保存两个scaler
- feature_scaler_bytes = BytesIO()
- joblib.dump(feature_scaler, feature_scaler_bytes)
- feature_scaler_bytes.seek(0) # Reset pointer to the beginning of the byte stream
- target_scaler_bytes = BytesIO()
- joblib.dump(target_scaler, target_scaler_bytes)
- target_scaler_bytes.seek(0)
- X, y = create_sequences(scaled_features, scaled_target, time_steps)
- # 划分训练集和测试集
- X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=43)
- # 构建 LSTM 模型
- model = Sequential()
- model.add(LSTM(units=50, return_sequences=False, input_shape=(time_steps, X_train.shape[2])))
- model.add(Dense(1)) # 输出单一值
- # 编译模型
- model.compile(optimizer='adam', loss='mean_squared_error')
- # 定义 EarlyStopping 和 ReduceLROnPlateau 回调
- early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True, verbose=1)
- reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, verbose=1)
- # 训练模型
- history = model.fit(X_train, y_train,
- epochs=100,
- batch_size=32,
- validation_data=(X_test, y_test),
- verbose=2,
- callbacks=[early_stopping, reduce_lr])
- # draw_loss(history)
- return model,feature_scaler_bytes,target_scaler_bytes
- def str_to_list(arg):
- if arg == '':
- return []
- else:
- return arg.split(',')
- @app.route('/model_training_lstm', methods=['POST'])
- def model_training_lstm():
- # 获取程序开始时间
- start_time = time.time()
- result = {}
- success = 0
- print("Program starts execution!")
- try:
- args = request.values.to_dict()
- print('args',args)
- logger.info(args)
- power_df = get_data_from_mongo(args)
- model,feature_scaler_bytes,target_scaler_bytes = build_model(power_df,args)
- insert_h5_model_into_mongo(model,feature_scaler_bytes,target_scaler_bytes ,args)
- success = 1
- except Exception as e:
- my_exception = traceback.format_exc()
- my_exception.replace("\n","\t")
- result['msg'] = my_exception
- end_time = time.time()
-
- result['success'] = success
- result['args'] = args
- result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
- result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
- print("Program execution ends!")
- return result
- if __name__=="__main__":
- print("Program starts execution!")
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- logger = logging.getLogger("model_training_lightgbm log")
- from waitress import serve
- serve(app, host="0.0.0.0", port=10096,threads=4)
- print("server start!")
|