accuracy.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. import pandas as pd
  2. import numpy as np
  3. from pymongo import MongoClient
  4. import requests
  5. import json
  6. from datetime import datetime
  7. from flask import Flask, request
  8. import time
  9. import logging
  10. import traceback
  11. app = Flask('evaluation_accuracy——service')
  12. url = 'http://49.4.78.194:17160/apiCalculate/calculate'
  13. '''
  14. 准确率接口使用手顺:
  15. ①入口方法为 calculate_acc
  16. ② 按照参数传传值
  17. data含有C_TIME时间、realValue实际功率、ableValue可用功率(没有数值用实际功率替代)、forecastAbleValue预测功率
  18. opt为包含场站必要信息的字典,字段为:cap装机容量 province省份 formulaType公式类型 electricType电站类型 stationCode场站编码
  19. 具体介绍参考接口文档
  20. ③公式计算分为按天和按点两种,指定好opt.formulaType,即可设置公式类型,再在求取的每天或每个点的结果上进行平均,结果返回
  21. '''
  22. def wrap_json(df, opt):
  23. """
  24. 包装json
  25. :param df: 列名为 C_TIME realValue ableValue forecastAbleValue的DataFrame
  26. :param opt: 参数字典
  27. :return: json列表
  28. """
  29. d = opt['formulaType'].split('_')[0]
  30. jata, dfs = [], []
  31. if d == 'POINT':
  32. df['time'] = df['C_TIME'].apply(datetime_to_timestamp)
  33. for i, row in df.iterrows():
  34. dfs.append(row.to_frame().T)
  35. elif d == 'DAY':
  36. df = df.copy()
  37. # df['time'] = df['C_TIME'].apply(datetime_to_timestamp) int(round(time.mktime(df['C_TIME'].timetuple()))*1000)
  38. # df.loc[:, 'time'] = df['C_TIME'].apply(datetime_to_timestamp)
  39. df['time'] = df['C_TIME'].apply(lambda x: datetime_to_timestamp(x))
  40. # df['time'] = df.apply(lambda row: datetime_to_timestamp(row['C_TIME']), axis=1)
  41. # df['C_TIME'] = df['C_TIME'].dt.strftime('%y%m%d') # 转换成年月日
  42. df.loc[:, 'C_TIME'] = df['C_TIME'].dt.strftime('%y%m%d')
  43. for i, group in df.groupby('C_TIME'):
  44. dfs.append(group)
  45. outter_dict = {"electricCapacity": str(opt['cap']), "province": opt['province'], "formulaType": opt['formulaType'], "electricType":opt['electricType'], "stationCode": opt['stationCode']}
  46. timestamp = int(time.mktime(datetime.now().timetuple()) * 1000 + datetime.now().microsecond / 1000.0)
  47. inner_dict = {"genTime": str(timestamp)+"L", "capacity": str(opt['cap']), "openCapacity": str(opt['cap'])}
  48. for df in dfs:
  49. calculationInfoList = df.iloc[:, 1:].to_json(orient='records')
  50. outter_dict['calculationInfoList'] = [dict(calculation, **inner_dict) for calculation in eval(calculationInfoList)]
  51. jata.append(json.dumps(outter_dict))
  52. return jata
  53. def send_reqest(url, jata):
  54. """
  55. 发送请求
  56. :param url: 请求地址
  57. :param jata: Json数据
  58. :return: 准确率
  59. """
  60. headers = {
  61. 'content-type': 'application/json;charset=UTF-8',
  62. "Authorization": "dXNlcjoxMjM0NTY="
  63. }
  64. acc, number = 0, 0
  65. for i in range(len(jata)):
  66. res = requests.post(url, headers=headers, data=jata[i])
  67. if res.json()['code'] == '500':
  68. print("没通过考核标准", end=' ')
  69. continue
  70. number += 1
  71. acc += float(res.json()['data'][:-1])
  72. if number != 0:
  73. acc /= number
  74. else:
  75. print("无法迭代计算准确率平均值,分母为0")
  76. return acc
  77. def calculate_acc(data, opt):
  78. """
  79. 准确率调用接口计算
  80. :param data: 列名为 C_TIME realValue ableValue forecastAbleValue的DataFrame
  81. :param opt: 参数字段
  82. :return: 计算结果
  83. """
  84. jata = wrap_json(data, opt)
  85. acc = send_reqest(url=url, jata=jata)
  86. return acc
  87. def datetime_to_timestamp(dt):
  88. return int(round(time.mktime(dt.timetuple()))*1000)
  89. def get_data_from_mongo(args):
  90. mongodb_connection,mongodb_database,mongodb_read_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_read_table']
  91. client = MongoClient(mongodb_connection)
  92. # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
  93. db = client[mongodb_database]
  94. collection = db[mongodb_read_table] # 集合名称
  95. data_from_db = collection.find() # 这会返回一个游标(cursor)
  96. # 将游标转换为列表,并创建 pandas DataFrame
  97. df = pd.DataFrame(list(data_from_db))
  98. client.close()
  99. return df
  100. def insert_data_into_mongo(res_df,args):
  101. mongodb_connection,mongodb_database,mongodb_write_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_write_table']
  102. client = MongoClient(mongodb_connection)
  103. db = client[mongodb_database]
  104. if mongodb_write_table in db.list_collection_names():
  105. db[mongodb_write_table].drop()
  106. print(f"Collection '{mongodb_write_table} already exist, deleted successfully!")
  107. collection = db[mongodb_write_table] # 集合名称
  108. # 将 DataFrame 转为字典格式
  109. data_dict = res_df.to_dict("records") # 每一行作为一个字典
  110. # 插入到 MongoDB
  111. collection.insert_many(data_dict)
  112. print("data inserted successfully!")
  113. # def compute_accuracy(df,args):
  114. # col_time,col_rp,col_pp,formulaType = args['col_time'],args['col_rp'],args['col_pp'],args['formulaType'].split('_')[0]
  115. # dates = []
  116. # accuracy = []
  117. # df = df[(~np.isnan(df[col_rp]))&(~np.isnan(df[col_pp]))]
  118. # df = df[[col_time,col_rp,col_pp]].rename(columns={col_time:'C_TIME',col_rp:'realValue',col_pp:'forecastAbleValue'})
  119. # df['ableValue'] = df['realValue']
  120. # df['C_TIME'] = df['C_TIME'].apply(lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S"))
  121. # if formulaType=='DAY':
  122. # df['C_DATE'] = df['C_TIME'].apply(lambda x: x.strftime("%Y-%m-%d"))
  123. # days_list = df['C_DATE'].unique().tolist()
  124. # for day in days_list:
  125. # df_tmp = df[df['C_DATE'] == day]
  126. # dates.append(day)
  127. # accuracy.append(calculate_acc(df_tmp, args))
  128. # else:
  129. # points = df['C_TIME'].unique().tolist()
  130. # for point in points:
  131. # df_tmp = df[df['C_TIME'] == point]
  132. # dates.append(point)
  133. # accuracy.append(calculate_acc(df_tmp, args))
  134. # print("accuray compute successfully!")
  135. # return pd.DataFrame({'date':dates,'accuracy':accuracy})
  136. # 定义 RMSE 和 MAE 计算函数
  137. def rmse(y_true, y_pred):
  138. return np.sqrt(np.mean((y_true - y_pred) ** 2))
  139. def mae(y_true, y_pred):
  140. return np.mean(np.abs(y_true - y_pred))
  141. def compute_accuracy(df,args):
  142. col_time,col_rp,col_pp = args['col_time'],args['col_rp'],args['col_pp']
  143. df[col_time] = df[col_time].apply(lambda x:pd.to_datetime(x).strftime("%Y-%m-%d"))
  144. # 按日期分组并计算 RMSE 和 MAE
  145. results = df.groupby(col_time).apply(
  146. lambda group: pd.Series({
  147. "RMSE": rmse(group[col_rp], group[col_pp]),
  148. "MAE": mae(group[col_rp], group[col_pp])
  149. })
  150. ).reset_index()
  151. return results
  152. @app.route('/evaluation_accuracy', methods=['POST'])
  153. def evaluation_accuracy():
  154. # 获取程序开始时间
  155. start_time = time.time()
  156. result = {}
  157. success = 0
  158. print("Program starts execution!")
  159. try:
  160. args = request.values.to_dict()
  161. print('args',args)
  162. logger.info(args)
  163. power_df = get_data_from_mongo(args)
  164. acc_result = compute_accuracy(power_df,args)
  165. insert_data_into_mongo(acc_result,args)
  166. success = 1
  167. except Exception as e:
  168. my_exception = traceback.format_exc()
  169. my_exception.replace("\n","\t")
  170. result['msg'] = my_exception
  171. end_time = time.time()
  172. result['success'] = success
  173. result['args'] = args
  174. result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
  175. result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
  176. print("Program execution ends!")
  177. return result
  178. if __name__=="__main__":
  179. print("Program starts execution!")
  180. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  181. logger = logging.getLogger("evaluation_accuracy log")
  182. from waitress import serve
  183. serve(app, host="0.0.0.0", port=10091)
  184. print("server start!")