# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
from flask import Flask, request
import time
import random
import logging
import traceback
import os
from matplotlib.pyplot import title
from common.database_dml import get_df_list_from_mongo, insert_data_into_mongo
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.io as pio
from bson.decimal128 import Decimal128
import numbers
app = Flask('analysis_report——service')
def create_fig(df_predict, col_time, label, label_pre, point):
# 创建一个图表对象
fig = go.Figure()
point_data = df_predict[df_predict['howLongAgo']==point]
# 获取所有的模型
models = df_predict['model'].unique()
# 添加实际功率曲线
fig.add_trace(go.Scatter(
x=df_predict[col_time],
y=df_predict[label],
mode='lines+markers',
name='实际功率', # 实际功率
line=dict(width=1), # 虚线
marker=dict(symbol='circle'),
))
# 为每个模型添加预测值和实际功率的曲线
for model in models:
# 筛选该模型的数据
model_data = point_data[point_data['model'] == model]
# 添加预测值曲线
fig.add_trace(go.Scatter(
x=model_data[col_time],
y=model_data[label_pre],
mode='lines+markers',
name=f'{model} 预测值', # 预测值
marker=dict(symbol='circle'),
line=dict(width=2)
))
fig_name = '超短期-第{}点'.format(point) if point < 17 else '超短期-平均值'
# 设置图表的标题和标签
fig.update_layout(
template='seaborn', # 使用 seaborn 模板
title=dict(
text=fig_name, # 标题
x=0.5, font=dict(size=20, color='darkblue') # 标题居中并设置字体大小和颜色
),
plot_bgcolor='rgba(255, 255, 255, 0.8)', # 背景色
xaxis=dict(
showgrid=True,
gridcolor='rgba(200, 200, 200, 0.5)', # 网格线颜色
title='时间', # 时间轴标题
rangeslider=dict(visible=True), # 显示滚动条
rangeselector=dict(visible=True) # 显示预设的时间范围选择器
),
yaxis=dict(
showgrid=True,
gridcolor='rgba(200, 200, 200, 0.5)',
title='功率' # y轴标题
),
legend=dict(
x=0.01,
y=0.99,
bgcolor='rgba(255, 255, 255, 0.7)', # 背景透明
bordercolor='black',
borderwidth=1,
font=dict(size=12) # 字体大小
),
hovermode='x unified', # 鼠标悬停时显示统一的提示框
hoverlabel=dict(
bgcolor='white',
font_size=14,
font_family="Rockwell", # 设置字体样式
bordercolor='black'
),
margin=dict(l=50, r=50, t=50, b=50) # 调整边距,避免标题或标签被遮挡
)
return fig
def put_analysis_report_to_html(args, df_predict, df_accuracy):
col_time = args['col_time']
label = args['label']
label_pre = args['label_pre']
farmId = args['farmId']
points = args['points'].split(',')
cdq_title = '超短期分析报告' + args.get('title', '')
acc_flag = df_accuracy.shape[0]
# 获取所有的模型
models = df_predict['model'].unique()
aves = []
# 添加超短期16个点平均值
for model in models:
# 筛选该模型的数据
model_data = df_predict[df_predict['model'] == model]
# 添加超短期16个点平均值
ave = model_data.groupby(col_time).agg({
label: 'first',
'model': 'first',
label_pre: 'mean',
'farm_id': 'first'
}).reset_index()
ave['howLongAgo'] = 17
ave = ave.reindex(columns=df_predict.columns.tolist())
aves.append(ave)
df_predict = pd.concat([df_predict]+aves)
df_predict = df_predict.applymap(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x).sort_values(by=col_time)
if acc_flag > 0:
df_accuracy = df_accuracy.applymap(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x).sort_values(by=col_time)
figs = [create_fig(df_predict, col_time, label, label_pre, int(p)) for p in points]
# 将折线图保存为 HTML 片段
power_htmls = [pio.to_html(f, full_html=False) for f in figs]
power_htmls = ["
{}
".format(html) for html in power_htmls]
# -------------------- 准确率表展示--------------------
acc_html = ''
if acc_flag > 0:
acc_html = df_accuracy.sort_values(by=col_time).to_html(classes='table table-bordered table-striped',
index=False)
# -------------------- 准确率汇总展示--------------------
summary_html = ''
if acc_flag > 0:
# 指定需要转换的列
cols_to_convert = ['MAE', 'accuracy', 'RMSE', 'deviationElectricity', 'deviationAssessment']
for col in cols_to_convert:
if col in df_accuracy.columns:
df_accuracy[col] = df_accuracy[col].apply(
lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x,
numbers.Number) else np.nan)
# 确定存在的列
agg_dict = {}
rename_cols = ['model']
if 'MAE' in df_accuracy.columns:
agg_dict['MAE'] = np.nanmean
rename_cols.append('MAE平均值')
if 'accuracy' in df_accuracy.columns:
agg_dict['accuracy'] = np.nanmean
rename_cols.append('准确率平均值')
if 'RMSE' in df_accuracy.columns:
agg_dict['RMSE'] = np.nanmean
rename_cols.append('RMSE平均值')
if 'deviationElectricity' in df_accuracy.columns:
agg_dict['deviationElectricity'] = [np.nanmean, np.nansum]
rename_cols.append('考核电量平均值')
rename_cols.append('考核总电量')
if 'deviationAssessment' in df_accuracy.columns:
agg_dict['deviationAssessment'] = [np.nanmean, np.nansum]
rename_cols.append('考核分数平均值')
rename_cols.append('考核总分数')
if 'accuracyAssessment' in df_accuracy.columns:
agg_dict['accuracyAssessment'] = [np.nanmean, np.nansum]
rename_cols.append('考核分数平均值')
rename_cols.append('考核总分数')
# 进行分组聚合,如果有需要聚合的列
summary_df = df_accuracy.groupby('model').agg(agg_dict).reset_index()
summary_df.columns = rename_cols
summary_html = summary_df.to_html(classes='table table-bordered table-striped', index=False)
# -------------------- 生成完整 HTML 页面 --------------------
html_content = f"""
Data Analysis Report
{ cdq_title }
1. 预测功率与实际功率曲线对比
{''.join(power_htmls)}
3. 准确率汇总对比
{summary_html}
"""
filename = f"{farmId}_{int(time.time() * 1000)}_{random.randint(1000, 9999)}.html"
# 保存为 HTML
directory = '/usr/share/nginx/html'
if not os.path.exists(directory):
os.makedirs(directory)
file_path = os.path.join(directory, filename)
path = f"http://ds3:10010/{filename}"
# 将 HTML 内容写入文件
with open(file_path, "w", encoding="utf-8") as f:
f.write(html_content)
print("HTML report generated successfully!")
return path
@app.route('/analysis_report_cdq', methods=['POST'])
def analysis_report():
start_time = time.time()
result = {}
success = 0
path = ""
print("Program starts execution!")
try:
args = request.values.to_dict()
print('args', args)
logger.info(args)
# 获取数据
df_predict, df_accuracy = get_df_list_from_mongo(args)[0], get_df_list_from_mongo(args)[1]
path = put_analysis_report_to_html(args, df_predict, df_accuracy)
success = 1
except Exception as e:
my_exception = traceback.format_exc()
my_exception.replace("\n", "\t")
result['msg'] = my_exception
end_time = time.time()
result['success'] = success
result['args'] = args
result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
result['file_path'] = path
print("Program execution ends!")
return result
if __name__ == "__main__":
print("Program starts execution!")
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("analysis_report log")
from waitress import serve
serve(app, host="0.0.0.0", port=10108)
print("server start!")
# args_dict = {"mongodb_database": 'db_cdq', 'mongodb_read_table': 'j00234_neu_overwrite,j00234_neu_res', 'col_time': 'dateTime',
# 'label': 'C_REAL_VALUE', 'label_pre': 'power_forecast', 'farmId': 'j00234'}
# df_predict, df_accuracy = get_df_list_from_mongo(args_dict)[0], get_df_list_from_mongo(args_dict)[1]
# path = put_analysis_report_to_html(args_dict, df_predict, df_accuracy)