accuracy.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. import argparse
  2. import pandas as pd
  3. import numpy as np
  4. from pymongo import MongoClient
  5. import requests
  6. import json, time
  7. from datetime import datetime
  8. from flask import Flask,request,jsonify
  9. from waitress import serve
  10. import time
  11. import logging
  12. import traceback
  13. app = Flask('evaluation_accuracy——service')
  14. url = 'http://49.4.78.194:17160/apiCalculate/calculate'
  15. '''
  16. 准确率接口使用手顺:
  17. ①入口方法为 calculate_acc
  18. ② 按照参数传传值
  19. data含有C_TIME时间、realValue实际功率、ableValue可用功率(没有数值用实际功率替代)、forecastAbleValue预测功率
  20. opt为包含场站必要信息的字典,字段为:cap装机容量 province省份 formulaType公式类型 electricType电站类型 stationCode场站编码
  21. 具体介绍参考接口文档
  22. ③公式计算分为按天和按点两种,指定好opt.formulaType,即可设置公式类型,再在求取的每天或每个点的结果上进行平均,结果返回
  23. '''
  24. def wrap_json(df, opt):
  25. """
  26. 包装json
  27. :param df: 列名为 C_TIME realValue ableValue forecastAbleValue的DataFrame
  28. :param opt: 参数字典
  29. :return: json列表
  30. """
  31. d = opt['formulaType'].split('_')[0]
  32. jata, dfs = [], []
  33. if d == 'POINT':
  34. df['time'] = df['C_TIME'].apply(datetime_to_timestamp)
  35. for i, row in df.iterrows():
  36. dfs.append(row.to_frame().T)
  37. elif d == 'DAY':
  38. df = df.copy()
  39. # df['time'] = df['C_TIME'].apply(datetime_to_timestamp) int(round(time.mktime(df['C_TIME'].timetuple()))*1000)
  40. # df.loc[:, 'time'] = df['C_TIME'].apply(datetime_to_timestamp)
  41. df['time'] = df['C_TIME'].apply(lambda x: datetime_to_timestamp(x))
  42. # df['time'] = df.apply(lambda row: datetime_to_timestamp(row['C_TIME']), axis=1)
  43. # df['C_TIME'] = df['C_TIME'].dt.strftime('%y%m%d') # 转换成年月日
  44. df.loc[:, 'C_TIME'] = df['C_TIME'].dt.strftime('%y%m%d')
  45. for i, group in df.groupby('C_TIME'):
  46. dfs.append(group)
  47. outter_dict = {"electricCapacity": str(opt['cap']), "province": opt['province'], "formulaType": opt['formulaType'], "electricType":opt['electricType'], "stationCode": opt['stationCode']}
  48. timestamp = int(time.mktime(datetime.now().timetuple()) * 1000 + datetime.now().microsecond / 1000.0)
  49. inner_dict = {"genTime": str(timestamp)+"L", "capacity": str(opt['cap']), "openCapacity": str(opt['cap'])}
  50. for df in dfs:
  51. calculationInfoList = df.iloc[:, 1:].to_json(orient='records')
  52. outter_dict['calculationInfoList'] = [dict(calculation, **inner_dict) for calculation in eval(calculationInfoList)]
  53. jata.append(json.dumps(outter_dict))
  54. return jata
  55. def send_reqest(url, jata):
  56. """
  57. 发送请求
  58. :param url: 请求地址
  59. :param jata: Json数据
  60. :return: 准确率
  61. """
  62. headers = {
  63. 'content-type': 'application/json;charset=UTF-8',
  64. "Authorization": "dXNlcjoxMjM0NTY="
  65. }
  66. acc, number = 0, 0
  67. for i in range(len(jata)):
  68. res = requests.post(url, headers=headers, data=jata[i])
  69. if res.json()['code'] == '500':
  70. print("没通过考核标准", end=' ')
  71. continue
  72. number += 1
  73. acc += float(res.json()['data'][:-1])
  74. if number != 0:
  75. acc /= number
  76. else:
  77. print("无法迭代计算准确率平均值,分母为0")
  78. return acc
  79. def calculate_acc(data, opt):
  80. """
  81. 准确率调用接口计算
  82. :param data: 列名为 C_TIME realValue ableValue forecastAbleValue的DataFrame
  83. :param opt: 参数字段
  84. :return: 计算结果
  85. """
  86. jata = wrap_json(data, opt)
  87. acc = send_reqest(url=url, jata=jata)
  88. return acc
  89. def datetime_to_timestamp(dt):
  90. return int(round(time.mktime(dt.timetuple()))*1000)
  91. def get_data_from_mongo(args):
  92. mongodb_connection,mongodb_database,mongodb_read_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_read_table']
  93. client = MongoClient(mongodb_connection)
  94. # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
  95. db = client[mongodb_database]
  96. collection = db[mongodb_read_table] # 集合名称
  97. data_from_db = collection.find() # 这会返回一个游标(cursor)
  98. # 将游标转换为列表,并创建 pandas DataFrame
  99. df = pd.DataFrame(list(data_from_db))
  100. client.close()
  101. return df
  102. def insert_data_into_mongo(res_df,args):
  103. mongodb_connection,mongodb_database,mongodb_write_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_write_table']
  104. client = MongoClient(mongodb_connection)
  105. db = client[mongodb_database]
  106. if mongodb_write_table in db.list_collection_names():
  107. db[mongodb_write_table].drop()
  108. print(f"Collection '{mongodb_write_table} already exist, deleted successfully!")
  109. collection = db[mongodb_write_table] # 集合名称
  110. # 将 DataFrame 转为字典格式
  111. data_dict = res_df.to_dict("records") # 每一行作为一个字典
  112. # 插入到 MongoDB
  113. collection.insert_many(data_dict)
  114. print("data inserted successfully!")
  115. # def compute_accuracy(df,args):
  116. # col_time,col_rp,col_pp,formulaType = args['col_time'],args['col_rp'],args['col_pp'],args['formulaType'].split('_')[0]
  117. # dates = []
  118. # accuracy = []
  119. # df = df[(~np.isnan(df[col_rp]))&(~np.isnan(df[col_pp]))]
  120. # df = df[[col_time,col_rp,col_pp]].rename(columns={col_time:'C_TIME',col_rp:'realValue',col_pp:'forecastAbleValue'})
  121. # df['ableValue'] = df['realValue']
  122. # df['C_TIME'] = df['C_TIME'].apply(lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S"))
  123. # if formulaType=='DAY':
  124. # df['C_DATE'] = df['C_TIME'].apply(lambda x: x.strftime("%Y-%m-%d"))
  125. # days_list = df['C_DATE'].unique().tolist()
  126. # for day in days_list:
  127. # df_tmp = df[df['C_DATE'] == day]
  128. # dates.append(day)
  129. # accuracy.append(calculate_acc(df_tmp, args))
  130. # else:
  131. # points = df['C_TIME'].unique().tolist()
  132. # for point in points:
  133. # df_tmp = df[df['C_TIME'] == point]
  134. # dates.append(point)
  135. # accuracy.append(calculate_acc(df_tmp, args))
  136. # print("accuray compute successfully!")
  137. # return pd.DataFrame({'date':dates,'accuracy':accuracy})
  138. # 定义 RMSE 和 MAE 计算函数
  139. def rmse(y_true, y_pred):
  140. return np.sqrt(np.mean((y_true - y_pred) ** 2))
  141. def mae(y_true, y_pred):
  142. return np.mean(np.abs(y_true - y_pred))
  143. def compute_accuracy(df,args):
  144. col_time,col_rp,col_pp = args['col_time'],args['col_rp'],args['col_pp']
  145. df[col_time] = df[col_time].apply(lambda x:pd.to_datetime(x).strftime("%Y-%m-%d"))
  146. # 按日期分组并计算 RMSE 和 MAE
  147. results = df.groupby(col_time).apply(
  148. lambda group: pd.Series({
  149. "RMSE": rmse(group[col_rp], group[col_pp]),
  150. "MAE": mae(group[col_rp], group[col_pp])
  151. })
  152. ).reset_index()
  153. return results
  154. @app.route('/evaluation_accuracy', methods=['POST'])
  155. def evaluation_accuracy():
  156. # 获取程序开始时间
  157. start_time = time.time()
  158. result = {}
  159. success = 0
  160. print("Program starts execution!")
  161. try:
  162. args = request.values.to_dict()
  163. print('args',args)
  164. logger.info(args)
  165. power_df = get_data_from_mongo(args)
  166. acc_result = compute_accuracy(power_df,args)
  167. insert_data_into_mongo(acc_result,args)
  168. success = 1
  169. except Exception as e:
  170. my_exception = traceback.format_exc()
  171. my_exception.replace("\n","\t")
  172. result['msg'] = my_exception
  173. end_time = time.time()
  174. result['success'] = success
  175. result['args'] = args
  176. result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
  177. result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
  178. print("Program execution ends!")
  179. return result
  180. if __name__=="__main__":
  181. print("Program starts execution!")
  182. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  183. logger = logging.getLogger("evaluation_accuracy log")
  184. from waitress import serve
  185. serve(app, host="0.0.0.0", port=10091)
  186. print("server start!")