import pandas as pd import numpy as np from pymongo import MongoClient import requests import json from datetime import datetime from flask import Flask, request import time import logging import traceback app = Flask('evaluation_accuracy——service') url = 'http://49.4.78.194:17160/apiCalculate/calculate' ''' 准确率接口使用手顺: ①入口方法为 calculate_acc ② 按照参数传传值 data含有C_TIME时间、realValue实际功率、ableValue可用功率(没有数值用实际功率替代)、forecastAbleValue预测功率 opt为包含场站必要信息的字典,字段为:cap装机容量 province省份 formulaType公式类型 electricType电站类型 stationCode场站编码 具体介绍参考接口文档 ③公式计算分为按天和按点两种,指定好opt.formulaType,即可设置公式类型,再在求取的每天或每个点的结果上进行平均,结果返回 ''' def wrap_json(df, opt): """ 包装json :param df: 列名为 C_TIME realValue ableValue forecastAbleValue的DataFrame :param opt: 参数字典 :return: json列表 """ d = opt['formulaType'].split('_')[0] jata, dfs = [], [] if d == 'POINT': df['time'] = df['C_TIME'].apply(datetime_to_timestamp) for i, row in df.iterrows(): dfs.append(row.to_frame().T) elif d == 'DAY': df = df.copy() # df['time'] = df['C_TIME'].apply(datetime_to_timestamp) int(round(time.mktime(df['C_TIME'].timetuple()))*1000) # df.loc[:, 'time'] = df['C_TIME'].apply(datetime_to_timestamp) df['time'] = df['C_TIME'].apply(lambda x: datetime_to_timestamp(x)) # df['time'] = df.apply(lambda row: datetime_to_timestamp(row['C_TIME']), axis=1) # df['C_TIME'] = df['C_TIME'].dt.strftime('%y%m%d') # 转换成年月日 df.loc[:, 'C_TIME'] = df['C_TIME'].dt.strftime('%y%m%d') for i, group in df.groupby('C_TIME'): dfs.append(group) outter_dict = {"electricCapacity": str(opt['cap']), "province": opt['province'], "formulaType": opt['formulaType'], "electricType":opt['electricType'], "stationCode": opt['stationCode']} timestamp = int(time.mktime(datetime.now().timetuple()) * 1000 + datetime.now().microsecond / 1000.0) inner_dict = {"genTime": str(timestamp)+"L", "capacity": str(opt['cap']), "openCapacity": str(opt['cap'])} for df in dfs: calculationInfoList = df.iloc[:, 1:].to_json(orient='records') outter_dict['calculationInfoList'] = [dict(calculation, **inner_dict) for calculation in eval(calculationInfoList)] jata.append(json.dumps(outter_dict)) return jata def send_reqest(url, jata): """ 发送请求 :param url: 请求地址 :param jata: Json数据 :return: 准确率 """ headers = { 'content-type': 'application/json;charset=UTF-8', "Authorization": "dXNlcjoxMjM0NTY=" } acc, number = 0, 0 for i in range(len(jata)): res = requests.post(url, headers=headers, data=jata[i]) if res.json()['code'] == '500': print("没通过考核标准", end=' ') continue number += 1 acc += float(res.json()['data'][:-1]) if number != 0: acc /= number else: print("无法迭代计算准确率平均值,分母为0") return acc def calculate_acc(data, opt): """ 准确率调用接口计算 :param data: 列名为 C_TIME realValue ableValue forecastAbleValue的DataFrame :param opt: 参数字段 :return: 计算结果 """ jata = wrap_json(data, opt) acc = send_reqest(url=url, jata=jata) return acc def datetime_to_timestamp(dt): return int(round(time.mktime(dt.timetuple()))*1000) def get_data_from_mongo(args): mongodb_connection,mongodb_database,mongodb_read_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_read_table'] client = MongoClient(mongodb_connection) # 选择数据库(如果数据库不存在,MongoDB 会自动创建) db = client[mongodb_database] collection = db[mongodb_read_table] # 集合名称 data_from_db = collection.find() # 这会返回一个游标(cursor) # 将游标转换为列表,并创建 pandas DataFrame df = pd.DataFrame(list(data_from_db)) client.close() return df def insert_data_into_mongo(res_df,args): mongodb_connection,mongodb_database,mongodb_write_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_write_table'] client = MongoClient(mongodb_connection) db = client[mongodb_database] if mongodb_write_table in db.list_collection_names(): db[mongodb_write_table].drop() print(f"Collection '{mongodb_write_table} already exist, deleted successfully!") collection = db[mongodb_write_table] # 集合名称 # 将 DataFrame 转为字典格式 data_dict = res_df.to_dict("records") # 每一行作为一个字典 # 插入到 MongoDB collection.insert_many(data_dict) print("data inserted successfully!") # def compute_accuracy(df,args): # col_time,col_rp,col_pp,formulaType = args['col_time'],args['col_rp'],args['col_pp'],args['formulaType'].split('_')[0] # dates = [] # accuracy = [] # df = df[(~np.isnan(df[col_rp]))&(~np.isnan(df[col_pp]))] # df = df[[col_time,col_rp,col_pp]].rename(columns={col_time:'C_TIME',col_rp:'realValue',col_pp:'forecastAbleValue'}) # df['ableValue'] = df['realValue'] # df['C_TIME'] = df['C_TIME'].apply(lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S")) # if formulaType=='DAY': # df['C_DATE'] = df['C_TIME'].apply(lambda x: x.strftime("%Y-%m-%d")) # days_list = df['C_DATE'].unique().tolist() # for day in days_list: # df_tmp = df[df['C_DATE'] == day] # dates.append(day) # accuracy.append(calculate_acc(df_tmp, args)) # else: # points = df['C_TIME'].unique().tolist() # for point in points: # df_tmp = df[df['C_TIME'] == point] # dates.append(point) # accuracy.append(calculate_acc(df_tmp, args)) # print("accuray compute successfully!") # return pd.DataFrame({'date':dates,'accuracy':accuracy}) # 定义 RMSE 和 MAE 计算函数 def rmse(y_true, y_pred): return np.sqrt(np.mean((y_true - y_pred) ** 2)) def mae(y_true, y_pred): return np.mean(np.abs(y_true - y_pred)) def compute_accuracy(df,args): col_time,col_rp,col_pp = args['col_time'],args['col_rp'],args['col_pp'] df[col_time] = df[col_time].apply(lambda x:pd.to_datetime(x).strftime("%Y-%m-%d")) # 按日期分组并计算 RMSE 和 MAE results = df.groupby(col_time).apply( lambda group: pd.Series({ "RMSE": rmse(group[col_rp], group[col_pp]), "MAE": mae(group[col_rp], group[col_pp]) }) ).reset_index() return results @app.route('/evaluation_accuracy', methods=['POST']) def evaluation_accuracy(): # 获取程序开始时间 start_time = time.time() result = {} success = 0 print("Program starts execution!") try: args = request.values.to_dict() print('args',args) logger.info(args) power_df = get_data_from_mongo(args) acc_result = compute_accuracy(power_df,args) insert_data_into_mongo(acc_result,args) success = 1 except Exception as e: my_exception = traceback.format_exc() my_exception.replace("\n","\t") result['msg'] = my_exception end_time = time.time() result['success'] = success result['args'] = args result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time)) result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time)) print("Program execution ends!") return result if __name__=="__main__": print("Program starts execution!") logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("evaluation_accuracy log") from waitress import serve serve(app, host="0.0.0.0", port=10091) print("server start!")