model_prediction_lightgbm.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. import pandas as pd
  2. from pymongo import MongoClient
  3. import pickle
  4. from flask import Flask,request
  5. import time
  6. import logging
  7. import traceback
  8. from common.database_dml import get_data_from_mongo,insert_data_into_mongo
  9. from common.alert import send_message
  10. from datetime import date, timedelta
  11. app = Flask('model_prediction_lightgbm——service')
  12. def str_to_list(arg):
  13. if arg == '':
  14. return []
  15. else:
  16. return arg.split(',')
  17. def forecast_data_distribution(pre_data,args):
  18. col_time = args['col_time']
  19. tomorrow = (date.today() + timedelta(days=1)).strftime('%Y-%m-%d')
  20. field_mapping = {'clearsky_ghi': 'clearskyGhi', 'dni_calcd': 'dniCalcd','surface_pressure':'surfacePressure'}
  21. # 根据字段映射重命名列
  22. pre_data = pre_data.rename(columns=field_mapping)
  23. if len(pre_data)==0:
  24. send_message('lightgbm预测组件', args['farmId'], '请注意:获取NWP数据为空,预测文件无法生成!')
  25. result = pd.DataFrame({'farm_id':[], col_time:[], 'power_forecast':[]})
  26. elif len(pre_data[pre_data[col_time].str.contains(tomorrow)])<96:
  27. send_message('lightgbm预测组件', args['farmId'], "日前数据记录缺失,不足96条,用DQ代替并补值!")
  28. start_time = pre_data[col_time].min()
  29. end_time = pre_data[col_time].max()
  30. date_range = pd.date_range(start=start_time, end=end_time, freq='15T').strftime('%Y-%m-%d %H:%M:%S').tolist()
  31. df_date = pd.DataFrame({col_time:date_range})
  32. result = pd.merge(df_date,pre_data,how='left',on=col_time).sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill')
  33. result = result[['farm_id', 'date_time', 'power_forecast']]
  34. else:
  35. df = pre_data.sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill')
  36. mongodb_connection, mongodb_database, mongodb_model_table, model_name = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/", \
  37. args['mongodb_database'], args['mongodb_model_table'], args['model_name']
  38. client = MongoClient(mongodb_connection)
  39. db = client[mongodb_database]
  40. collection = db[mongodb_model_table]
  41. model_data = collection.find_one({"model_name": model_name})
  42. if model_data is not None:
  43. model_binary = model_data['model'] # 确保这个字段是存储模型的二进制数据
  44. # 反序列化模型
  45. model = pickle.loads(model_binary)
  46. diff = set(model.feature_name()) - set(pre_data.columns)
  47. if len(diff) > 0:
  48. send_message('lightgbm预测组件', args['farmId'], f'NWP特征列缺失,使用DQ代替!features:{diff}')
  49. result = pre_data[['farm_id', 'date_time', 'power_forecast']]
  50. else:
  51. df['power_forecast'] = model.predict(df[model.feature_name()])
  52. df.loc[df['power_forecast'] < 0, 'power_forecast'] = 0
  53. # 添加小时列 把光夜间置为0
  54. df["hour"] = pd.to_datetime(df["date_time"]).dt.hour
  55. df.loc[(df["hour"] >= 20) | (df["hour"] < 6), 'power_forecast'] = 0
  56. print("model predict result successfully!")
  57. result = df[['farm_id', 'date_time', 'power_forecast']]
  58. else:
  59. send_message('lightgbm预测组件', args['farmId'], "日前数据记录缺失,不足96条,用DQ代替并补值!")
  60. result = pre_data[['farm_id', 'date_time', 'power_forecast']]
  61. result['power_forecast'] = round(result['power_forecast'],2)
  62. return result
  63. def model_prediction(df,args):
  64. mongodb_connection,mongodb_database,mongodb_model_table,model_name,col_reserve = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_model_table'],args['model_name'],str_to_list(args['col_reserve'])
  65. client = MongoClient(mongodb_connection)
  66. db = client[mongodb_database]
  67. collection = db[mongodb_model_table]
  68. model_data = collection.find_one({"model_name": model_name})
  69. if 'is_limit' in df.columns:
  70. df = df[df['is_limit'] == False]
  71. if model_data is not None:
  72. model_binary = model_data['model'] # 确保这个字段是存储模型的二进制数据
  73. # 反序列化模型
  74. model = pickle.loads(model_binary)
  75. df['predict'] = model.predict(df[model.feature_name()])
  76. df.loc[df['predict']<0,'predict']=0
  77. df['model'] = model_name
  78. print("model predict result successfully!")
  79. features_reserve = col_reserve + ['model','predict']
  80. return df[set(features_reserve)]
  81. @app.route('/model_prediction_lightgbm', methods=['POST'])
  82. def model_prediction_lightgbm():
  83. # 获取程序开始时间
  84. start_time = time.time()
  85. result = {}
  86. success = 0
  87. print("Program starts execution!")
  88. try:
  89. args = request.values.to_dict()
  90. print('args',args)
  91. logger.info(args)
  92. forecast_file = int(args['forecast_file'])
  93. power_df = get_data_from_mongo(args)
  94. if forecast_file == 1:
  95. predict_data = forecast_data_distribution(power_df, args)
  96. else:
  97. predict_data = model_prediction(power_df, args)
  98. insert_data_into_mongo(predict_data,args)
  99. success = 1
  100. except Exception as e:
  101. my_exception = traceback.format_exc()
  102. my_exception.replace("\n","\t")
  103. result['msg'] = my_exception
  104. end_time = time.time()
  105. result['success'] = success
  106. result['args'] = args
  107. result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
  108. result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
  109. print("Program execution ends!")
  110. return result
  111. if __name__=="__main__":
  112. print("Program starts execution!")
  113. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  114. logger = logging.getLogger("model_prediction_lightgbm log")
  115. from waitress import serve
  116. serve(app, host="0.0.0.0", port=10090)
  117. print("server start!")