import lightgbm as lgb import numpy as np from sklearn.model_selection import train_test_split from sklearn.metrics import mean_squared_error, mean_absolute_error from flask import Flask, request import time import traceback import logging from common.database_dml import get_data_from_mongo, insert_pickle_model_into_mongo from common.processing_data_common import missing_features, str_to_list from sklearn.pipeline import Pipeline from sklearn.svm import SVR from sklearn.preprocessing import MinMaxScaler from data_processing.data_operation.weight import WEIGHT_REGISTER app = Flask('model_training_ml——service') def train_lgb(data_split, categorical_features, model_params, num_boost_round=100, sample_weight=None): X_train, X_test, y_train, y_test = data_split # 创建LightGBM数据集 lgb_train = lgb.Dataset(X_train, y_train, categorical_feature=categorical_features, weight=sample_weight) lgb_eval = lgb.Dataset(X_test, y_test, reference=lgb_train) # 设置参数 params = { 'objective': 'regression', 'metric': 'rmse', 'boosting_type': 'gbdt', 'verbose': 1 } params.update(model_params) # 训练模型 print('Starting training...') gbm = lgb.train(params, lgb_train, num_boost_round=num_boost_round, valid_sets=[lgb_train, lgb_eval], ) y_pred = gbm.predict(X_test, num_iteration=gbm.best_iteration) return gbm, y_pred def train_svr(data_split, model_params, sample_weight=None): X_train, X_test, y_train, y_test = data_split svr = Pipeline([('scaler', MinMaxScaler()), ('model', SVR(**model_params))]) # 训练模型 print('Starting training...') svr.fit(X_train, y_train, model__sample_weight=sample_weight) y_pred = svr.predict(X_test) return svr, y_pred def build_model(df, args): np.random.seed(42) # 参数 numerical_features, categorical_features, label, model_name, model_params, col_time = str_to_list( args['numerical_features']), str_to_list(args['categorical_features']), args['label'], args['model_name'], eval( args['model_params']), args['col_time'] features = numerical_features + categorical_features print("features:************", features) if 'is_limit' in df.columns: df = df[df['is_limit'] == False] # 清洗特征平均缺失率大于20%的天 df = missing_features(df, features, col_time) df = df[~np.isnan(df[label])] # 拆分数据为训练集和测试集 X_train, X_test, y_train, y_test = train_test_split(df[features], df[label], test_size=0.2, random_state=42, shuffle=False) model_type = args['model_type'] sample_weight = None # 样本权重 if 'sample_weight' in args.keys(): if args['sample_weight'] in WEIGHT_REGISTER.keys(): sample_weight = WEIGHT_REGISTER[args['sample_weight']](df[label].values.reshape(-1), **args) elif args['sample_weight'] in df.columns.tolist(): sample_weight = df[args['sample_weight']].values.reshape(-1) else: sample_weight = None print('sample_weight is neither in the predefined weights nor a column of the DataFrame, not applicable') # 区分常规机器学习模型和lgb,这里只实例化svr,后续可扩展 if model_type == "lightgbm": num_boost_round = int(args['num_boost_round']) model, y_pred = train_lgb([X_train, X_test, y_train, y_test], categorical_features, model_params, num_boost_round, sample_weight=sample_weight) elif model_type == "svr": model, y_pred = train_svr([X_train, X_test, y_train, y_test], model_params, sample_weight=sample_weight) else: raise ValueError(f"Invalid model_type, must be one of [lightgbm, svr]") # 评估 mse = mean_squared_error(y_test, y_pred) rmse = np.sqrt(mse) mae = mean_absolute_error(y_test, y_pred) print(f'The test rmse is: {rmse},"The test mae is:"{mae}') return model, features @app.route('/model_training_ml', methods=['POST']) def model_training_ml(): # 获取程序开始时间 start_time = time.time() result = {} success = 0 print("Program starts execution!") try: args = request.values.to_dict() print('args', args) logger.info(args) power_df = get_data_from_mongo(args) model, features = build_model(power_df, args) insert_pickle_model_into_mongo(model, args, features=features) success = 1 except Exception as e: my_exception = traceback.format_exc() my_exception.replace("\n", "\t") result['msg'] = my_exception end_time = time.time() result['success'] = success result['args'] = args result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time)) result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time)) print("Program execution ends!") return result if __name__ == "__main__": print("Program starts execution!") logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("model_training_ml log") from waitress import serve serve(app, host="0.0.0.0", port=10125) print("server start!")