123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- import pandas as pd
- import numpy as np
- from pymongo import MongoClient
- import requests
- import json
- from datetime import datetime
- from flask import Flask, request
- import time
- import logging
- import traceback
- app = Flask('evaluation_accuracy——service')
- url = 'http://49.4.78.194:17160/apiCalculate/calculate'
- '''
- 准确率接口使用手顺:
- ①入口方法为 calculate_acc
- ② 按照参数传传值
- data含有C_TIME时间、realValue实际功率、ableValue可用功率(没有数值用实际功率替代)、forecastAbleValue预测功率
- opt为包含场站必要信息的字典,字段为:cap装机容量 province省份 formulaType公式类型 electricType电站类型 stationCode场站编码
- 具体介绍参考接口文档
- ③公式计算分为按天和按点两种,指定好opt.formulaType,即可设置公式类型,再在求取的每天或每个点的结果上进行平均,结果返回
- '''
- def wrap_json(df, opt):
- """
- 包装json
- :param df: 列名为 C_TIME realValue ableValue forecastAbleValue的DataFrame
- :param opt: 参数字典
- :return: json列表
- """
- d = opt['formulaType'].split('_')[0]
- jata, dfs = [], []
- if d == 'POINT':
- df['time'] = df['C_TIME'].apply(datetime_to_timestamp)
- for i, row in df.iterrows():
- dfs.append(row.to_frame().T)
- elif d == 'DAY':
- df = df.copy()
- # df['time'] = df['C_TIME'].apply(datetime_to_timestamp) int(round(time.mktime(df['C_TIME'].timetuple()))*1000)
- # df.loc[:, 'time'] = df['C_TIME'].apply(datetime_to_timestamp)
- df['time'] = df['C_TIME'].apply(lambda x: datetime_to_timestamp(x))
- # df['time'] = df.apply(lambda row: datetime_to_timestamp(row['C_TIME']), axis=1)
- # df['C_TIME'] = df['C_TIME'].dt.strftime('%y%m%d') # 转换成年月日
- df.loc[:, 'C_TIME'] = df['C_TIME'].dt.strftime('%y%m%d')
- for i, group in df.groupby('C_TIME'):
- dfs.append(group)
- outter_dict = {"electricCapacity": str(opt['cap']), "province": opt['province'], "formulaType": opt['formulaType'], "electricType":opt['electricType'], "stationCode": opt['stationCode']}
- timestamp = int(time.mktime(datetime.now().timetuple()) * 1000 + datetime.now().microsecond / 1000.0)
- inner_dict = {"genTime": str(timestamp)+"L", "capacity": str(opt['cap']), "openCapacity": str(opt['cap'])}
- for df in dfs:
- calculationInfoList = df.iloc[:, 1:].to_json(orient='records')
- outter_dict['calculationInfoList'] = [dict(calculation, **inner_dict) for calculation in eval(calculationInfoList)]
- jata.append(json.dumps(outter_dict))
- return jata
- def send_reqest(url, jata):
- """
- 发送请求
- :param url: 请求地址
- :param jata: Json数据
- :return: 准确率
- """
- headers = {
- 'content-type': 'application/json;charset=UTF-8',
- "Authorization": "dXNlcjoxMjM0NTY="
- }
- acc, number = 0, 0
- for i in range(len(jata)):
- res = requests.post(url, headers=headers, data=jata[i])
- if res.json()['code'] == '500':
- print("没通过考核标准", end=' ')
- continue
- number += 1
- acc += float(res.json()['data'][:-1])
- if number != 0:
- acc /= number
- else:
- print("无法迭代计算准确率平均值,分母为0")
- return acc
- def calculate_acc(data, opt):
- """
- 准确率调用接口计算
- :param data: 列名为 C_TIME realValue ableValue forecastAbleValue的DataFrame
- :param opt: 参数字段
- :return: 计算结果
- """
- jata = wrap_json(data, opt)
- acc = send_reqest(url=url, jata=jata)
- return acc
- def datetime_to_timestamp(dt):
- return int(round(time.mktime(dt.timetuple()))*1000)
- def get_data_from_mongo(args):
- mongodb_connection,mongodb_database,mongodb_read_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_read_table']
- client = MongoClient(mongodb_connection)
- # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
- db = client[mongodb_database]
- collection = db[mongodb_read_table] # 集合名称
- data_from_db = collection.find() # 这会返回一个游标(cursor)
- # 将游标转换为列表,并创建 pandas DataFrame
- df = pd.DataFrame(list(data_from_db))
- client.close()
- return df
-
- def insert_data_into_mongo(res_df,args):
- mongodb_connection,mongodb_database,mongodb_write_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_write_table']
- client = MongoClient(mongodb_connection)
- db = client[mongodb_database]
- if mongodb_write_table in db.list_collection_names():
- db[mongodb_write_table].drop()
- print(f"Collection '{mongodb_write_table} already exist, deleted successfully!")
- collection = db[mongodb_write_table] # 集合名称
- # 将 DataFrame 转为字典格式
- data_dict = res_df.to_dict("records") # 每一行作为一个字典
- # 插入到 MongoDB
- collection.insert_many(data_dict)
- print("data inserted successfully!")
-
- # def compute_accuracy(df,args):
- # col_time,col_rp,col_pp,formulaType = args['col_time'],args['col_rp'],args['col_pp'],args['formulaType'].split('_')[0]
- # dates = []
- # accuracy = []
- # df = df[(~np.isnan(df[col_rp]))&(~np.isnan(df[col_pp]))]
- # df = df[[col_time,col_rp,col_pp]].rename(columns={col_time:'C_TIME',col_rp:'realValue',col_pp:'forecastAbleValue'})
- # df['ableValue'] = df['realValue']
- # df['C_TIME'] = df['C_TIME'].apply(lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S"))
- # if formulaType=='DAY':
- # df['C_DATE'] = df['C_TIME'].apply(lambda x: x.strftime("%Y-%m-%d"))
- # days_list = df['C_DATE'].unique().tolist()
- # for day in days_list:
- # df_tmp = df[df['C_DATE'] == day]
- # dates.append(day)
- # accuracy.append(calculate_acc(df_tmp, args))
- # else:
- # points = df['C_TIME'].unique().tolist()
- # for point in points:
- # df_tmp = df[df['C_TIME'] == point]
- # dates.append(point)
- # accuracy.append(calculate_acc(df_tmp, args))
- # print("accuray compute successfully!")
- # return pd.DataFrame({'date':dates,'accuracy':accuracy})
- # 定义 RMSE 和 MAE 计算函数
- def rmse(y_true, y_pred):
- return np.sqrt(np.mean((y_true - y_pred) ** 2))
- def mae(y_true, y_pred):
- return np.mean(np.abs(y_true - y_pred))
-
- def compute_accuracy(df,args):
- col_time,col_rp,col_pp = args['col_time'],args['col_rp'],args['col_pp']
- df[col_time] = df[col_time].apply(lambda x:pd.to_datetime(x).strftime("%Y-%m-%d"))
- # 按日期分组并计算 RMSE 和 MAE
- results = df.groupby(col_time).apply(
- lambda group: pd.Series({
- "RMSE": rmse(group[col_rp], group[col_pp]),
- "MAE": mae(group[col_rp], group[col_pp])
- })
- ).reset_index()
- return results
-
- @app.route('/evaluation_accuracy', methods=['POST'])
- def evaluation_accuracy():
- # 获取程序开始时间
- start_time = time.time()
- result = {}
- success = 0
- print("Program starts execution!")
- try:
- args = request.values.to_dict()
- print('args',args)
- logger.info(args)
- power_df = get_data_from_mongo(args)
- acc_result = compute_accuracy(power_df,args)
- insert_data_into_mongo(acc_result,args)
- success = 1
- except Exception as e:
- my_exception = traceback.format_exc()
- my_exception.replace("\n","\t")
- result['msg'] = my_exception
- end_time = time.time()
-
- result['success'] = success
- result['args'] = args
- result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
- result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
- print("Program execution ends!")
- return result
-
- if __name__=="__main__":
- print("Program starts execution!")
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- logger = logging.getLogger("evaluation_accuracy log")
- from waitress import serve
- serve(app, host="0.0.0.0", port=10091)
- print("server start!")
|