123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- import lightgbm as lgb
- import numpy as np
- from sklearn.model_selection import train_test_split
- from sklearn.metrics import mean_squared_error, mean_absolute_error
- from flask import Flask, request
- import time
- import traceback
- import logging
- from common.database_dml import get_data_from_mongo, insert_pickle_model_into_mongo
- from common.processing_data_common import missing_features, str_to_list
- from sklearn.pipeline import Pipeline
- from sklearn.svm import SVR
- from sklearn.preprocessing import MinMaxScaler
- from data_processing.data_operation.weight import WEIGHT_REGISTER
- app = Flask('model_training_ml——service')
- def train_lgb(data_split, categorical_features, model_params, num_boost_round=100, sample_weight=None):
- X_train, X_test, y_train, y_test = data_split
- # 创建LightGBM数据集
- lgb_train = lgb.Dataset(X_train, y_train, categorical_feature=categorical_features, weight=sample_weight)
- lgb_eval = lgb.Dataset(X_test, y_test, reference=lgb_train)
- # 设置参数
- params = {
- 'objective': 'regression',
- 'metric': 'rmse',
- 'boosting_type': 'gbdt',
- 'verbose': 1
- }
- params.update(model_params)
- # 训练模型
- print('Starting training...')
- gbm = lgb.train(params,
- lgb_train,
- num_boost_round=num_boost_round,
- valid_sets=[lgb_train, lgb_eval],
- )
- y_pred = gbm.predict(X_test, num_iteration=gbm.best_iteration)
- return gbm, y_pred
- def train_svr(data_split, model_params, sample_weight=None):
- X_train, X_test, y_train, y_test = data_split
- svr = Pipeline([('scaler', MinMaxScaler()),
- ('model', SVR(**model_params))])
- # 训练模型
- print('Starting training...')
- svr.fit(X_train, y_train, model__sample_weight=sample_weight)
- y_pred = svr.predict(X_test)
- return svr, y_pred
- def build_model(df, args):
- np.random.seed(42)
- # 参数
- numerical_features, categorical_features, label, model_name, model_params, col_time = str_to_list(
- args['numerical_features']), str_to_list(args['categorical_features']), args['label'], args['model_name'], eval(
- args['model_params']), args['col_time']
- features = numerical_features + categorical_features
- print("features:************", features)
- if 'is_limit' in df.columns:
- df = df[df['is_limit'] == False]
- # 清洗特征平均缺失率大于20%的天
- df = missing_features(df, features, col_time)
- df = df[~np.isnan(df[label])]
- # 拆分数据为训练集和测试集
- X_train, X_test, y_train, y_test = train_test_split(df[features], df[label], test_size=0.2, random_state=42,
- shuffle=False)
- model_type = args['model_type']
- sample_weight = None
- # 样本权重
- if 'sample_weight' in args.keys():
- if args['sample_weight'] in WEIGHT_REGISTER.keys():
- sample_weight = WEIGHT_REGISTER[args['sample_weight']](df[label].values.reshape(-1), **args)
- elif args['sample_weight'] in df.columns.tolist():
- sample_weight = df[args['sample_weight']].values.reshape(-1)
- else:
- sample_weight = None
- print('sample_weight is neither in the predefined weights nor a column of the DataFrame, not applicable')
- # 区分常规机器学习模型和lgb,这里只实例化svr,后续可扩展
- if model_type == "lightgbm":
- num_boost_round = int(args['num_boost_round'])
- model, y_pred = train_lgb([X_train, X_test, y_train, y_test], categorical_features, model_params,
- num_boost_round, sample_weight=sample_weight)
- elif model_type == "svr":
- model, y_pred = train_svr([X_train, X_test, y_train, y_test], model_params, sample_weight=sample_weight)
- else:
- raise ValueError(f"Invalid model_type, must be one of [lightgbm, svr]")
- # 评估
- mse = mean_squared_error(y_test, y_pred)
- rmse = np.sqrt(mse)
- mae = mean_absolute_error(y_test, y_pred)
- print(f'The test rmse is: {rmse},"The test mae is:"{mae}')
- return model, features
- @app.route('/model_training_ml', methods=['POST'])
- def model_training_ml():
- # 获取程序开始时间
- start_time = time.time()
- result = {}
- success = 0
- print("Program starts execution!")
- try:
- args = request.values.to_dict()
- print('args', args)
- logger.info(args)
- power_df = get_data_from_mongo(args)
- model, features = build_model(power_df, args)
- insert_pickle_model_into_mongo(model, args, features=features)
- success = 1
- except Exception as e:
- my_exception = traceback.format_exc()
- my_exception.replace("\n", "\t")
- result['msg'] = my_exception
- end_time = time.time()
- result['success'] = success
- result['args'] = args
- result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
- result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
- print("Program execution ends!")
- return result
- if __name__ == "__main__":
- print("Program starts execution!")
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- logger = logging.getLogger("model_training_ml log")
- from waitress import serve
- serve(app, host="0.0.0.0", port=10125)
- print("server start!")
|