evaluation_accuracy.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. import pandas as pd
  2. import numpy as np
  3. from pymongo import MongoClient
  4. import requests
  5. import json
  6. from datetime import datetime
  7. from flask import Flask, request
  8. import time
  9. import logging
  10. import traceback
  11. from common.database_dml import get_data_from_mongo, insert_data_into_mongo
  12. app = Flask('evaluation_accuracy——service')
  13. url = 'http://49.4.78.194:17160/apiCalculate/calculate'
  14. '''
  15. 准确率接口使用手顺:
  16. ①入口方法为 calculate_acc
  17. ② 按照参数传传值
  18. data含有C_TIME时间、realValue实际功率、ableValue可用功率(没有数值用实际功率替代)、forecastAbleValue预测功率
  19. opt为包含场站必要信息的字典,字段为:cap装机容量 province省份 formulaType公式类型 electricType电站类型 stationCode场站编码
  20. 具体介绍参考接口文档
  21. ③公式计算分为按天和按点两种,指定好opt.formulaType,即可设置公式类型,再在求取的每天或每个点的结果上进行平均,结果返回
  22. '''
  23. def wrap_json(df, opt):
  24. """
  25. 包装json
  26. :param df: 列名为 C_TIME realValue ableValue forecastAbleValue的DataFrame
  27. :param opt: 参数字典
  28. :return: json列表
  29. """
  30. d = opt['formulaType'].split('_')[0]
  31. jata, dfs = [], []
  32. if d == 'POINT':
  33. df['time'] = df['C_TIME'].apply(datetime_to_timestamp)
  34. for i, row in df.iterrows():
  35. dfs.append(row.to_frame().T)
  36. elif d == 'DAY':
  37. df = df.copy()
  38. # df['time'] = df['C_TIME'].apply(datetime_to_timestamp) int(round(time.mktime(df['C_TIME'].timetuple()))*1000)
  39. # df.loc[:, 'time'] = df['C_TIME'].apply(datetime_to_timestamp)
  40. df['time'] = df['C_TIME'].apply(lambda x: datetime_to_timestamp(x))
  41. # df['time'] = df.apply(lambda row: datetime_to_timestamp(row['C_TIME']), axis=1)
  42. # df['C_TIME'] = df['C_TIME'].dt.strftime('%y%m%d') # 转换成年月日
  43. df.loc[:, 'C_TIME'] = df['C_TIME'].dt.strftime('%y%m%d')
  44. for i, group in df.groupby('C_TIME'):
  45. dfs.append(group)
  46. outter_dict = {"electricCapacity": str(opt['cap']), "province": opt['province'], "formulaType": opt['formulaType'], "electricType":opt['electricType'], "stationCode": opt['stationCode']}
  47. timestamp = int(time.mktime(datetime.now().timetuple()) * 1000 + datetime.now().microsecond / 1000.0)
  48. inner_dict = {"genTime": str(timestamp)+"L", "capacity": str(opt['cap']), "openCapacity": str(opt['cap'])}
  49. for df in dfs:
  50. calculationInfoList = df.iloc[:, 1:].to_json(orient='records')
  51. outter_dict['calculationInfoList'] = [dict(calculation, **inner_dict) for calculation in eval(calculationInfoList)]
  52. jata.append(json.dumps(outter_dict))
  53. return jata
  54. def send_reqest(url, jata):
  55. """
  56. 发送请求
  57. :param url: 请求地址
  58. :param jata: Json数据
  59. :return: 准确率
  60. """
  61. headers = {
  62. 'content-type': 'application/json;charset=UTF-8',
  63. "Authorization": "dXNlcjoxMjM0NTY="
  64. }
  65. acc, number = 0, 0
  66. for i in range(len(jata)):
  67. res = requests.post(url, headers=headers, data=jata[i])
  68. if res.json()['code'] == '500':
  69. print("没通过考核标准", end=' ')
  70. continue
  71. number += 1
  72. acc += float(res.json()['data'][:-1])
  73. if number != 0:
  74. acc /= number
  75. else:
  76. print("无法迭代计算准确率平均值,分母为0")
  77. return acc
  78. def calculate_acc(data, opt):
  79. """
  80. 准确率调用接口计算
  81. :param data: 列名为 C_TIME realValue ableValue forecastAbleValue的DataFrame
  82. :param opt: 参数字段
  83. :return: 计算结果
  84. """
  85. jata = wrap_json(data, opt)
  86. acc = send_reqest(url=url, jata=jata)
  87. return acc
  88. def datetime_to_timestamp(dt):
  89. return int(round(time.mktime(dt.timetuple()))*1000)
  90. # 定义 RMSE 和 MAE 计算函数
  91. def rmse(y_true, y_pred):
  92. return np.sqrt(np.mean((y_true - y_pred) ** 2))
  93. def mae(y_true, y_pred):
  94. return np.mean(np.abs(y_true - y_pred))
  95. def compute_accuracy(df,args):
  96. col_time,col_rp,col_pp = args['col_time'],args['col_rp'],args['col_pp']
  97. df[col_time] = df[col_time].apply(lambda x:pd.to_datetime(x).strftime("%Y-%m-%d"))
  98. # 按日期分组并计算 RMSE 和 MAE
  99. results = df.groupby(col_time).apply(
  100. lambda group: pd.Series({
  101. "RMSE": rmse(group[col_rp], group[col_pp]),
  102. "MAE": mae(group[col_rp], group[col_pp])
  103. })
  104. ).reset_index()
  105. return results
  106. @app.route('/evaluation_accuracy', methods=['POST'])
  107. def evaluation_accuracy():
  108. # 获取程序开始时间
  109. start_time = time.time()
  110. result = {}
  111. success = 0
  112. print("Program starts execution!")
  113. try:
  114. args = request.values.to_dict()
  115. print('args',args)
  116. logger.info(args)
  117. power_df = get_data_from_mongo(args)
  118. acc_result = compute_accuracy(power_df,args)
  119. insert_data_into_mongo(acc_result,args)
  120. success = 1
  121. except Exception as e:
  122. my_exception = traceback.format_exc()
  123. my_exception.replace("\n","\t")
  124. result['msg'] = my_exception
  125. end_time = time.time()
  126. result['success'] = success
  127. result['args'] = args
  128. result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
  129. result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
  130. print("Program execution ends!")
  131. return result
  132. if __name__=="__main__":
  133. print("Program starts execution!")
  134. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  135. logger = logging.getLogger("evaluation_accuracy log")
  136. from waitress import serve
  137. serve(app, host="0.0.0.0", port=10091)
  138. print("server start!")