123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- import pandas as pd
- from pymongo import MongoClient
- import pickle
- from flask import Flask,request
- import time
- import logging
- import traceback
- from common.database_dml import get_data_from_mongo,insert_data_into_mongo
- from common.alert import send_message
- from datetime import datetime, timedelta
- import pytz
- from pytz import timezone
- from common.processing_data_common import get_xxl_dq
- app = Flask('model_prediction_lightgbm——service')
- def str_to_list(arg):
- if arg == '':
- return []
- else:
- return arg.split(',')
- def forecast_data_distribution(pre_data,args):
- col_time = args['col_time']
- farm_id = args['farmId']
- dt = datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai")).strftime('%Y%m%d')
- tomorrow = (datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai")) + timedelta(days=1)).strftime('%Y-%m-%d')
- field_mapping = {'clearsky_ghi': 'clearskyGhi', 'dni_calcd': 'dniCalcd','surface_pressure':'surfacePressure',
- 'wd140m': 'tj_wd140','ws140m': 'tj_ws140','wd170m': 'tj_wd170','cldt': 'tj_tcc','wd70m': 'tj_wd70',
- 'ws100m': 'tj_ws100','DSWRFsfc': 'tj_radiation','wd10m': 'tj_wd10','TMP2m': 'tj_t2','wd30m': 'tj_wd30',
- 'ws30m': 'tj_ws30','rh2m': 'tj_rh','PRATEsfc': 'tj_pratesfc','ws170m': 'tj_ws170','wd50m': 'tj_wd50',
- 'ws50m': 'tj_ws50','wd100m': 'tj_wd100','ws70m': 'tj_ws70','ws10m': 'tj_ws10','psz': 'tj_pressure',
- 'cldl': 'tj_lcc','pres': 'tj_pres','dateTime':'date_time'}
- # 根据字段映射重命名列
- pre_data = pre_data.rename(columns=field_mapping)
- if len(pre_data)==0:
- send_message('lightgbm预测组件', farm_id, '请注意:获取NWP数据为空,预测文件无法生成!')
- result = get_xxl_dq(farm_id, dt)
- elif len(pre_data[pre_data[col_time].str.contains(tomorrow)])<96:
- send_message('lightgbm预测组件', farm_id, "日前数据记录缺失,不足96条,用DQ代替并补值!")
- result = get_xxl_dq(farm_id, dt)
- else:
- df = pre_data.sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill')
- mongodb_connection, mongodb_database, mongodb_model_table, model_name = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/", \
- args['mongodb_database'], args['mongodb_model_table'], args['model_name']
- client = MongoClient(mongodb_connection)
- db = client[mongodb_database]
- collection = db[mongodb_model_table]
- model_data = collection.find_one({"model_name": model_name})
- if model_data is not None:
- model_binary = model_data['model'] # 确保这个字段是存储模型的二进制数据
- # 反序列化模型
- model = pickle.loads(model_binary)
- diff = set(model.feature_name()) - set(pre_data.columns)
- if len(diff) > 0:
- send_message('lightgbm预测组件', farm_id, f'NWP特征列缺失,使用DQ代替!features:{diff}')
- result = get_xxl_dq(farm_id, dt)
- else:
- df['power_forecast'] = model.predict(df[model.feature_name()])
- df.loc[df['power_forecast'] < 0, 'power_forecast'] = 0
- print("model predict result successfully!")
- if 'farm_id' not in df.columns:
- df['farm_id'] = farm_id
- result = df[['farm_id', 'date_time', 'power_forecast']]
- else:
- send_message('lightgbm预测组件', farm_id, "模型文件缺失,用DQ代替并补值!")
- result = get_xxl_dq(farm_id, dt)
- result['power_forecast'] = round(result['power_forecast'],2)
- return result
- def model_prediction(df,args):
- mongodb_connection,mongodb_database,mongodb_model_table,model_name,howLongAgo,farm_id = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_model_table'],args['model_name'],int(args['howLongAgo']),args['farm_id']
- client = MongoClient(mongodb_connection)
- db = client[mongodb_database]
- collection = db[mongodb_model_table]
- model_data = collection.find_one({"model_name": model_name})
- if 'is_limit' in df.columns:
- df = df[df['is_limit'] == False]
- if model_data is not None:
- model_binary = model_data['model'] # 确保这个字段是存储模型的二进制数据
- # 反序列化模型
- model = pickle.loads(model_binary)
- df['power_forecast'] = model.predict(df[model.feature_name()])
- df.loc[df['power_forecast'] < 0, 'power_forecast'] = 0
- df['model'] = model_name
- df['howLongAgo'] = howLongAgo
- df['farm_id'] = farm_id
- print("model predict result successfully!")
- return df[['dateTime','howLongAgo','model','farm_id','power_forecast','realPower']]
- @app.route('/model_prediction_lightgbm', methods=['POST'])
- def model_prediction_lightgbm():
- # 获取程序开始时间
- start_time = time.time()
- result = {}
- success = 0
- print("Program starts execution!")
- try:
- args = request.values.to_dict()
- print('args',args)
- logger.info(args)
- forecast_file = int(args['forecast_file'])
- power_df = get_data_from_mongo(args)
- if forecast_file == 1:
- predict_data = forecast_data_distribution(power_df, args)
- else:
- predict_data = model_prediction(power_df, args)
- insert_data_into_mongo(predict_data,args)
- success = 1
- except Exception as e:
- my_exception = traceback.format_exc()
- my_exception.replace("\n","\t")
- result['msg'] = my_exception
- end_time = time.time()
-
- result['success'] = success
- result['args'] = args
- result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
- result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
- print("Program execution ends!")
- return result
- if __name__=="__main__":
- print("Program starts execution!")
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- logger = logging.getLogger("model_prediction_lightgbm log")
- from waitress import serve
- serve(app, host="0.0.0.0", port=10090)
- print("server start!")
-
|