model_prediction_lightgbm.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. import pandas as pd
  2. from pymongo import MongoClient
  3. import pickle
  4. from flask import Flask,request
  5. import time
  6. import logging
  7. import traceback
  8. from common.database_dml import get_data_from_mongo,insert_data_into_mongo
  9. from common.alert import send_message
  10. from datetime import date, timedelta
  11. app = Flask('model_prediction_lightgbm——service')
  12. def str_to_list(arg):
  13. if arg == '':
  14. return []
  15. else:
  16. return arg.split(',')
  17. def forecast_data_distribution(pre_data,args):
  18. col_time = args['col_time']
  19. tomorrow = (date.today() + timedelta(days=1)).strftime('%Y-%m-%d')
  20. field_mapping = {'clearsky_ghi': 'clearskyGhi', 'dni_calcd': 'dniCalcd','surface_pressure':'surfacePressure'}
  21. # 根据字段映射重命名列
  22. pre_data = pre_data.rename(columns=field_mapping)
  23. if len(pre_data)==0:
  24. send_message('lightgbm预测组件', args['farmId'], '请注意:获取NWP数据为空,预测文件无法生成!')
  25. result = pd.DataFrame({'farm_id':[], col_time:[], 'power_forecast':[]})
  26. elif len(pre_data[pre_data[col_time].str.contains(tomorrow)])<96:
  27. send_message('lightgbm预测组件', args['farmId'], "日前数据记录缺失,不足96条,用DQ代替并补值!")
  28. start_time = pre_data[col_time].min()
  29. end_time = pre_data[col_time].max()
  30. date_range = pd.date_range(start=start_time, end=end_time, freq='15T').strftime('%Y-%m-%d %H:%M:%S').tolist()
  31. df_date = pd.DataFrame({col_time:date_range})
  32. result = pd.merge(df_date,pre_data,how='left',on=col_time).sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill')
  33. result = result[['farm_id', 'date_time', 'power_forecast']]
  34. else:
  35. df = pre_data.sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill')
  36. mongodb_connection, mongodb_database, mongodb_model_table, model_name = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/", \
  37. args['mongodb_database'], args['mongodb_model_table'], args['model_name']
  38. client = MongoClient(mongodb_connection)
  39. db = client[mongodb_database]
  40. collection = db[mongodb_model_table]
  41. model_data = collection.find_one({"model_name": model_name})
  42. if model_data is not None:
  43. model_binary = model_data['model'] # 确保这个字段是存储模型的二进制数据
  44. # 反序列化模型
  45. model = pickle.loads(model_binary)
  46. diff = set(model.feature_name()) - set(pre_data.columns)
  47. if len(diff) > 0:
  48. send_message('lightgbm预测组件', args['farmId'], f'NWP特征列缺失,使用DQ代替!features:{diff}')
  49. result = pre_data[['farm_id', 'date_time', 'power_forecast']]
  50. else:
  51. df['power_forecast'] = model.predict(df[model.feature_name()])
  52. df.loc[df['power_forecast'] < 0, 'power_forecast'] = 0
  53. print("model predict result successfully!")
  54. result = df[['farm_id', 'date_time', 'power_forecast']]
  55. else:
  56. send_message('lightgbm预测组件', args['farmId'], "日前数据记录缺失,不足96条,用DQ代替并补值!")
  57. result = pre_data[['farm_id', 'date_time', 'power_forecast']]
  58. result['power_forecast'] = round(result['power_forecast'],2)
  59. return result
  60. def model_prediction(df,args):
  61. mongodb_connection,mongodb_database,mongodb_model_table,model_name,col_reserve = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_model_table'],args['model_name'],str_to_list(args['col_reserve'])
  62. client = MongoClient(mongodb_connection)
  63. db = client[mongodb_database]
  64. collection = db[mongodb_model_table]
  65. model_data = collection.find_one({"model_name": model_name})
  66. if 'is_limit' in df.columns:
  67. df = df[df['is_limit'] == False]
  68. if model_data is not None:
  69. model_binary = model_data['model'] # 确保这个字段是存储模型的二进制数据
  70. # 反序列化模型
  71. model = pickle.loads(model_binary)
  72. df['predict'] = model.predict(df[model.feature_name()])
  73. df.loc[df['predict']<0,'predict']=0
  74. df['model'] = model_name
  75. print("model predict result successfully!")
  76. features_reserve = col_reserve + ['model','predict']
  77. return df[set(features_reserve)]
  78. @app.route('/model_prediction_lightgbm', methods=['POST'])
  79. def model_prediction_lightgbm():
  80. # 获取程序开始时间
  81. start_time = time.time()
  82. result = {}
  83. success = 0
  84. print("Program starts execution!")
  85. try:
  86. args = request.values.to_dict()
  87. print('args',args)
  88. logger.info(args)
  89. forecast_file = int(args['forecast_file'])
  90. power_df = get_data_from_mongo(args)
  91. if forecast_file == 1:
  92. predict_data = forecast_data_distribution(power_df, args)
  93. else:
  94. predict_data = model_prediction(power_df, args)
  95. insert_data_into_mongo(predict_data,args)
  96. success = 1
  97. except Exception as e:
  98. my_exception = traceback.format_exc()
  99. my_exception.replace("\n","\t")
  100. result['msg'] = my_exception
  101. end_time = time.time()
  102. result['success'] = success
  103. result['args'] = args
  104. result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
  105. result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
  106. print("Program execution ends!")
  107. return result
  108. if __name__=="__main__":
  109. print("Program starts execution!")
  110. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  111. logger = logging.getLogger("model_prediction_lightgbm log")
  112. from waitress import serve
  113. serve(app, host="0.0.0.0", port=10090)
  114. print("server start!")