import pandas as pd import numpy as np from pymongo import MongoClient import requests import json from datetime import datetime from flask import Flask, request import time import logging import traceback from common.database_dml import get_data_from_mongo, insert_data_into_mongo app = Flask('evaluation_accuracy——service') url = 'http://49.4.78.194:17160/apiCalculate/calculate' ''' 准确率接口使用手顺: ①入口方法为 calculate_acc ② 按照参数传传值 data含有C_TIME时间、realValue实际功率、ableValue可用功率(没有数值用实际功率替代)、forecastAbleValue预测功率 opt为包含场站必要信息的字典,字段为:cap装机容量 province省份 formulaType公式类型 electricType电站类型 stationCode场站编码 具体介绍参考接口文档 ③公式计算分为按天和按点两种,指定好opt.formulaType,即可设置公式类型,再在求取的每天或每个点的结果上进行平均,结果返回 ''' def wrap_json(df, opt): """ 包装json :param df: 列名为 C_TIME realValue ableValue forecastAbleValue的DataFrame :param opt: 参数字典 :return: json列表 """ d = opt['formulaType'].split('_')[0] jata, dfs = [], [] if d == 'POINT': df['time'] = df['C_TIME'].apply(datetime_to_timestamp) for i, row in df.iterrows(): dfs.append(row.to_frame().T) elif d == 'DAY': df = df.copy() # df['time'] = df['C_TIME'].apply(datetime_to_timestamp) int(round(time.mktime(df['C_TIME'].timetuple()))*1000) # df.loc[:, 'time'] = df['C_TIME'].apply(datetime_to_timestamp) df['time'] = df['C_TIME'].apply(lambda x: datetime_to_timestamp(x)) # df['time'] = df.apply(lambda row: datetime_to_timestamp(row['C_TIME']), axis=1) # df['C_TIME'] = df['C_TIME'].dt.strftime('%y%m%d') # 转换成年月日 df.loc[:, 'C_TIME'] = df['C_TIME'].dt.strftime('%y%m%d') for i, group in df.groupby('C_TIME'): dfs.append(group) outter_dict = {"electricCapacity": str(opt['cap']), "province": opt['province'], "formulaType": opt['formulaType'], "electricType":opt['electricType'], "stationCode": opt['stationCode']} timestamp = int(time.mktime(datetime.now().timetuple()) * 1000 + datetime.now().microsecond / 1000.0) inner_dict = {"genTime": str(timestamp)+"L", "capacity": str(opt['cap']), "openCapacity": str(opt['cap'])} for df in dfs: calculationInfoList = df.iloc[:, 1:].to_json(orient='records') outter_dict['calculationInfoList'] = [dict(calculation, **inner_dict) for calculation in eval(calculationInfoList)] jata.append(json.dumps(outter_dict)) return jata def send_reqest(url, jata): """ 发送请求 :param url: 请求地址 :param jata: Json数据 :return: 准确率 """ headers = { 'content-type': 'application/json;charset=UTF-8', "Authorization": "dXNlcjoxMjM0NTY=" } acc, number = 0, 0 for i in range(len(jata)): res = requests.post(url, headers=headers, data=jata[i]) if res.json()['code'] == '500': print("没通过考核标准", end=' ') continue number += 1 acc += float(res.json()['data'][:-1]) if number != 0: acc /= number else: print("无法迭代计算准确率平均值,分母为0") return acc def calculate_acc(data, opt): """ 准确率调用接口计算 :param data: 列名为 C_TIME realValue ableValue forecastAbleValue的DataFrame :param opt: 参数字段 :return: 计算结果 """ jata = wrap_json(data, opt) acc = send_reqest(url=url, jata=jata) return acc def datetime_to_timestamp(dt): return int(round(time.mktime(dt.timetuple()))*1000) # 定义 RMSE 和 MAE 计算函数 def rmse(y_true, y_pred): return np.sqrt(np.mean((y_true - y_pred) ** 2)) def mae(y_true, y_pred): return np.mean(np.abs(y_true - y_pred)) def compute_accuracy(df,args): col_time,col_rp,col_pp = args['col_time'],args['col_rp'],args['col_pp'] df[col_time] = df[col_time].apply(lambda x:pd.to_datetime(x).strftime("%Y-%m-%d")) # 按日期分组并计算 RMSE 和 MAE results = df.groupby(col_time).apply( lambda group: pd.Series({ "RMSE": rmse(group[col_rp], group[col_pp]), "MAE": mae(group[col_rp], group[col_pp]) }) ).reset_index() return results @app.route('/evaluation_accuracy', methods=['POST']) def evaluation_accuracy(): # 获取程序开始时间 start_time = time.time() result = {} success = 0 print("Program starts execution!") try: args = request.values.to_dict() print('args',args) logger.info(args) power_df = get_data_from_mongo(args) acc_result = compute_accuracy(power_df,args) insert_data_into_mongo(acc_result,args) success = 1 except Exception as e: my_exception = traceback.format_exc() my_exception.replace("\n","\t") result['msg'] = my_exception end_time = time.time() result['success'] = success result['args'] = args result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time)) result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time)) print("Program execution ends!") return result if __name__=="__main__": print("Program starts execution!") logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("evaluation_accuracy log") from waitress import serve serve(app, host="0.0.0.0", port=10091) print("server start!")