import pandas as pd from pymongo import MongoClient import pickle from flask import Flask,request import time import logging import traceback from common.database_dml import get_data_from_mongo,insert_data_into_mongo from common.alert import send_message from datetime import datetime, timedelta import pytz from pytz import timezone app = Flask('model_prediction_lightgbm——service') def str_to_list(arg): if arg == '': return [] else: return arg.split(',') def forecast_data_distribution(pre_data,args): col_time = args['col_time'] # tomorrow = (date.today() + timedelta(days=1)).strftime('%Y-%m-%d') tomorrow = (datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai")) + timedelta(days=1)).strftime('%Y-%m-%d') field_mapping = {'clearsky_ghi': 'clearskyGhi', 'dni_calcd': 'dniCalcd','surface_pressure':'surfacePressure'} # 根据字段映射重命名列 pre_data = pre_data.rename(columns=field_mapping) if len(pre_data)==0: send_message('lightgbm预测组件', args['farmId'], '请注意:获取NWP数据为空,预测文件无法生成!') result = pd.DataFrame({'farm_id':[], col_time:[], 'power_forecast':[]}) elif len(pre_data[pre_data[col_time].str.contains(tomorrow)])<96: send_message('lightgbm预测组件', args['farmId'], "日前数据记录缺失,不足96条,用DQ代替并补值!") start_time = pre_data[col_time].min() end_time = pre_data[col_time].max() date_range = pd.date_range(start=start_time, end=end_time, freq='15T').strftime('%Y-%m-%d %H:%M:%S').tolist() df_date = pd.DataFrame({col_time:date_range}) result = pd.merge(df_date,pre_data,how='left',on=col_time).sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill') result = result[['farm_id', 'date_time', 'power_forecast']] else: df = pre_data.sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill') mongodb_connection, mongodb_database, mongodb_model_table, model_name = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/", \ args['mongodb_database'], args['mongodb_model_table'], args['model_name'] client = MongoClient(mongodb_connection) db = client[mongodb_database] collection = db[mongodb_model_table] model_data = collection.find_one({"model_name": model_name}) if model_data is not None: model_binary = model_data['model'] # 确保这个字段是存储模型的二进制数据 # 反序列化模型 model = pickle.loads(model_binary) diff = set(model.feature_name()) - set(pre_data.columns) if len(diff) > 0: send_message('lightgbm预测组件', args['farmId'], f'NWP特征列缺失,使用DQ代替!features:{diff}') result = pre_data[['farm_id', 'date_time', 'power_forecast']] else: df['power_forecast'] = model.predict(df[model.feature_name()]) df.loc[df['power_forecast'] < 0, 'power_forecast'] = 0 # 添加小时列 把光夜间置为0 df["hour"] = pd.to_datetime(df["date_time"]).dt.hour df.loc[(df["hour"] >= 20) | (df["hour"] < 6), 'power_forecast'] = 0 print("model predict result successfully!") result = df[['farm_id', 'date_time', 'power_forecast']] else: send_message('lightgbm预测组件', args['farmId'], "日前数据记录缺失,不足96条,用DQ代替并补值!") result = pre_data[['farm_id', 'date_time', 'power_forecast']] result['power_forecast'] = round(result['power_forecast'],2) return result def model_prediction(df,args): mongodb_connection,mongodb_database,mongodb_model_table,model_name,col_reserve = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_model_table'],args['model_name'],str_to_list(args['col_reserve']) client = MongoClient(mongodb_connection) db = client[mongodb_database] collection = db[mongodb_model_table] model_data = collection.find_one({"model_name": model_name}) if 'is_limit' in df.columns: df = df[df['is_limit'] == False] if model_data is not None: model_binary = model_data['model'] # 确保这个字段是存储模型的二进制数据 # 反序列化模型 model = pickle.loads(model_binary) df['predict'] = model.predict(df[model.feature_name()]) df.loc[df['predict']<0,'predict']=0 df['model'] = model_name print("model predict result successfully!") features_reserve = col_reserve + ['model','predict'] return df[set(features_reserve)] @app.route('/model_prediction_lightgbm', methods=['POST']) def model_prediction_lightgbm(): # 获取程序开始时间 start_time = time.time() result = {} success = 0 print("Program starts execution!") try: args = request.values.to_dict() print('args',args) logger.info(args) forecast_file = int(args['forecast_file']) power_df = get_data_from_mongo(args) if forecast_file == 1: predict_data = forecast_data_distribution(power_df, args) else: predict_data = model_prediction(power_df, args) insert_data_into_mongo(predict_data,args) success = 1 except Exception as e: my_exception = traceback.format_exc() my_exception.replace("\n","\t") result['msg'] = my_exception end_time = time.time() result['success'] = success result['args'] = args result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time)) result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time)) print("Program execution ends!") return result if __name__=="__main__": print("Program starts execution!") logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("model_prediction_lightgbm log") from waitress import serve serve(app, host="0.0.0.0", port=10090) print("server start!")